llkv_column_map/store/
projection.rs

1//! Projection planners for building Arrow batches from column chunks.
2//!
3//! The projection subsystem wires descriptor metadata, chunk loading helpers,
4//! and gather routines to materialize user-facing result sets. It centralizes
5//! null-handling policies and keeps the hot paths monomorphized per Arrow type.
6
7use super::*;
8use crate::gather::{
9    RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10    gather_rows_from_chunks as shared_gather_rows_from_chunks,
11    gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12    gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13    gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14    gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15    gather_rows_single_shot as shared_gather_rows_single_shot,
16    gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17    gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18    gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19    gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28    pager::{BatchGet, GetResult, Pager},
29    serialization::deserialize_array,
30    types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39    /// Any missing row ID results in an error.
40    ErrorOnMissing,
41    /// Missing rows surface as nulls in the result arrays.
42    IncludeNulls,
43    /// Missing rows and rows where all projected columns are null are dropped from the output.
44    DropNulls,
45}
46
47#[derive(Clone, Debug, Default, Eq, PartialEq)]
48pub struct Projection {
49    pub logical_field_id: LogicalFieldId,
50    pub alias: Option<String>,
51}
52
53impl Projection {
54    pub fn new(logical_field_id: LogicalFieldId) -> Self {
55        Self {
56            logical_field_id,
57            alias: None,
58        }
59    }
60
61    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
62        Self {
63            logical_field_id,
64            alias: Some(alias.into()),
65        }
66    }
67}
68
69impl From<LogicalFieldId> for Projection {
70    fn from(logical_field_id: LogicalFieldId) -> Self {
71        Projection::new(logical_field_id)
72    }
73}
74
75impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
76    fn from(value: (LogicalFieldId, S)) -> Self {
77        Projection::with_alias(value.0, value.1)
78    }
79}
80
81impl GatherNullPolicy {
82    #[inline]
83    fn allow_missing(self) -> bool {
84        !matches!(self, Self::ErrorOnMissing)
85    }
86}
87
88pub struct MultiGatherContext {
89    field_infos: Vec<(LogicalFieldId, DataType)>,
90    fields: Vec<Field>,
91    plans: Vec<FieldPlan>,
92    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
93    row_index: FxHashMap<u64, usize>,
94    row_scratch: Vec<Option<(usize, usize)>>,
95    chunk_keys: Vec<PhysicalKey>,
96}
97
98impl MultiGatherContext {
99    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
100        // Build Field objects from field_infos with default nullable=true
101        // This is for backward compatibility when no expected schema is provided
102        let fields: Vec<Field> = field_infos
103            .iter()
104            .map(|(fid, dtype)| {
105                let field_name = format!("field_{}", u64::from(*fid));
106                Field::new(field_name, dtype.clone(), true)
107            })
108            .collect();
109
110        Self {
111            chunk_cache: FxHashMap::default(),
112            row_index: FxHashMap::default(),
113            row_scratch: Vec::new(),
114            chunk_keys: Vec::new(),
115            field_infos,
116            fields,
117            plans,
118        }
119    }
120
121    #[inline]
122    pub fn is_empty(&self) -> bool {
123        self.plans.is_empty()
124    }
125
126    #[inline]
127    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
128        &self.field_infos
129    }
130
131    #[inline]
132    fn fields(&self) -> &[Field] {
133        &self.fields
134    }
135
136    #[inline]
137    fn plans(&self) -> &[FieldPlan] {
138        &self.plans
139    }
140
141    #[inline]
142    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
143        &self.chunk_cache
144    }
145
146    #[inline]
147    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
148        &mut self.chunk_cache
149    }
150
151    #[inline]
152    fn plans_mut(&mut self) -> &mut [FieldPlan] {
153        &mut self.plans
154    }
155
156    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
157        std::mem::take(&mut self.chunk_keys)
158    }
159
160    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
161        self.chunk_keys = keys;
162    }
163
164    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
165        std::mem::take(&mut self.row_index)
166    }
167
168    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
169        self.row_index = row_index;
170    }
171
172    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
173        std::mem::take(&mut self.row_scratch)
174    }
175
176    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
177        self.row_scratch = scratch;
178    }
179
180    pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
181        let first_plan = self.plans.first()?;
182        let mut chunk_idx = None;
183        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
184            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
185                chunk_idx = Some(idx);
186                break;
187            }
188        }
189        if chunk_idx.is_none() {
190            let total_chunks = first_plan.row_metas.len();
191            'outer: for idx in 0..total_chunks {
192                for plan in &self.plans {
193                    let meta = &plan.row_metas[idx];
194                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
195                        chunk_idx = Some(idx);
196                        break 'outer;
197                    }
198                }
199            }
200        }
201        let idx = chunk_idx?;
202
203        let mut span_min = u64::MAX;
204        let mut span_max = 0u64;
205        for plan in &self.plans {
206            let meta = plan.row_metas.get(idx)?;
207            span_min = span_min.min(meta.min_val_u64);
208            span_max = span_max.max(meta.max_val_u64);
209        }
210
211        if span_min > span_max {
212            return None;
213        }
214
215        Some((idx, span_min, span_max))
216    }
217}
218
219#[derive(Clone, Debug)]
220struct FieldPlan {
221    dtype: DataType,
222    value_metas: Vec<ChunkMetadata>,
223    row_metas: Vec<ChunkMetadata>,
224    candidate_indices: Vec<usize>,
225}
226
227impl<P> ColumnStore<P>
228where
229    P: Pager<Blob = EntryHandle> + Send + Sync,
230{
231    /// Gathers multiple columns using a configurable null-handling policy.
232    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
233    /// projected columns are null or missing are removed from the
234    /// resulting batch.
235    pub fn gather_rows(
236        &self,
237        field_ids: &[LogicalFieldId],
238        row_ids: &[u64],
239        policy: GatherNullPolicy,
240    ) -> Result<RecordBatch> {
241        self.gather_rows_with_schema(field_ids, row_ids, policy, None)
242    }
243
244    /// Gather rows with an optional expected schema for empty result sets.
245    ///
246    /// When `expected_schema` is provided and `row_ids` is empty, the returned
247    /// RecordBatch will use that schema instead of synthesizing one with all nullable fields.
248    /// This ensures that non-nullable columns (e.g., PRIMARY KEYs) are correctly represented.
249    pub fn gather_rows_with_schema(
250        &self,
251        field_ids: &[LogicalFieldId],
252        row_ids: &[u64],
253        policy: GatherNullPolicy,
254        expected_schema: Option<Arc<Schema>>,
255    ) -> Result<RecordBatch> {
256        let mut ctx = self.prepare_gather_context(field_ids)?;
257        self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
258    }
259
260    /// Executes a one-off gather with optional schema for empty result sets.
261    fn execute_gather_single_pass_with_schema(
262        &self,
263        ctx: &mut MultiGatherContext,
264        row_ids: &[u64],
265        policy: GatherNullPolicy,
266        expected_schema: Option<Arc<Schema>>,
267    ) -> Result<RecordBatch> {
268        if ctx.is_empty() {
269            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
270        }
271
272        let field_infos = ctx.field_infos().to_vec();
273
274        if row_ids.is_empty() {
275            // Use expected_schema if provided to preserve nullability
276            let (schema, arrays) = if let Some(expected_schema) = expected_schema {
277                let mut arrays = Vec::with_capacity(expected_schema.fields().len());
278                for field in expected_schema.fields() {
279                    arrays.push(new_empty_array(field.data_type()));
280                }
281                (expected_schema, arrays)
282            } else {
283                // Fallback: Use fields from context which have nullable=true by default
284                // This is safe because nullable=true is always compatible
285                let fields = ctx.fields();
286                let mut arrays = Vec::with_capacity(fields.len());
287                for field in fields {
288                    arrays.push(new_empty_array(field.data_type()));
289                }
290                (Arc::new(Schema::new(fields.to_vec())), arrays)
291            };
292            return RecordBatch::try_new(schema, arrays)
293                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
294        }
295
296        let mut row_index: FxHashMap<u64, usize> =
297            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
298        for (idx, &row_id) in row_ids.iter().enumerate() {
299            if row_index.insert(row_id, idx).is_some() {
300                return Err(Error::Internal(
301                    "duplicate row_id in gather_rows_multi".into(),
302                ));
303            }
304        }
305
306        let mut sorted_row_ids = row_ids.to_vec();
307        sorted_row_ids.sort_unstable();
308
309        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
310        {
311            let plans_mut = ctx.plans_mut();
312            for plan in plans_mut.iter_mut() {
313                plan.candidate_indices.clear();
314                for (idx, meta) in plan.row_metas.iter().enumerate() {
315                    if Self::chunk_intersects(&sorted_row_ids, meta) {
316                        plan.candidate_indices.push(idx);
317                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
318                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
319                    }
320                }
321            }
322        }
323
324        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
325        for &key in &chunk_keys {
326            chunk_requests.push(BatchGet::Raw { key });
327        }
328
329        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
330            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
331        if !chunk_requests.is_empty() {
332            let chunk_results = self.pager.batch_get(&chunk_requests)?;
333            for result in chunk_results {
334                if let GetResult::Raw { key, bytes } = result {
335                    chunk_map.insert(key, bytes);
336                }
337            }
338        }
339
340        let allow_missing = policy.allow_missing();
341
342        let mut outputs = Vec::with_capacity(ctx.plans().len());
343        for plan in ctx.plans() {
344            let array = match &plan.dtype {
345                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
346                    &row_index,
347                    row_ids.len(),
348                    plan,
349                    &mut chunk_map,
350                    allow_missing,
351                ),
352                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
353                    &row_index,
354                    row_ids.len(),
355                    plan,
356                    &mut chunk_map,
357                    allow_missing,
358                ),
359                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
360                    &row_index,
361                    row_ids.len(),
362                    plan,
363                    &mut chunk_map,
364                    allow_missing,
365                ),
366                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
367                    &row_index,
368                    row_ids.len(),
369                    plan,
370                    &mut chunk_map,
371                    allow_missing,
372                ),
373                DataType::Boolean => Self::gather_rows_single_shot_bool(
374                    &row_index,
375                    row_ids.len(),
376                    plan,
377                    &mut chunk_map,
378                    allow_missing,
379                ),
380                DataType::Struct(_) => Self::gather_rows_single_shot_struct(
381                    &row_index,
382                    row_ids.len(),
383                    plan,
384                    &mut chunk_map,
385                    allow_missing,
386                    &plan.dtype,
387                ),
388                other => with_integer_arrow_type!(
389                    other.clone(),
390                    |ArrowTy| {
391                        Self::gather_rows_single_shot::<ArrowTy>(
392                            &row_index,
393                            row_ids.len(),
394                            plan,
395                            &mut chunk_map,
396                            allow_missing,
397                        )
398                    },
399                    Err(Error::Internal(format!(
400                        "gather_rows_multi: unsupported dtype {:?}",
401                        other
402                    ))),
403                ),
404            }?;
405            outputs.push(array);
406        }
407
408        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
409            Self::filter_rows_with_non_null(outputs)?
410        } else {
411            outputs
412        };
413
414        let mut fields = Vec::with_capacity(field_infos.len());
415        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
416            let array = &outputs[idx];
417            let field_name = format!("field_{}", u64::from(*fid));
418            let nullable = match policy {
419                GatherNullPolicy::IncludeNulls => true,
420                _ => array.null_count() > 0,
421            };
422            fields.push(Field::new(field_name, dtype.clone(), nullable));
423        }
424
425        let schema = Arc::new(Schema::new(fields));
426        RecordBatch::try_new(schema, outputs)
427            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
428    }
429
430    pub fn prepare_gather_context(
431        &self,
432        field_ids: &[LogicalFieldId],
433    ) -> Result<MultiGatherContext> {
434        let mut field_infos = Vec::with_capacity(field_ids.len());
435        for &fid in field_ids {
436            field_infos.push((fid, self.data_type(fid)?));
437        }
438
439        if field_infos.is_empty() {
440            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
441        }
442
443        let catalog = self.catalog.read().unwrap();
444        let mut key_pairs = Vec::with_capacity(field_infos.len());
445        for (fid, _) in &field_infos {
446            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
447            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
448            key_pairs.push((value_pk, row_pk));
449        }
450        drop(catalog);
451
452        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
453        for (value_pk, row_pk) in &key_pairs {
454            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
455            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
456        }
457        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
458        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
459        for result in descriptor_results {
460            if let GetResult::Raw { key, bytes } = result {
461                descriptor_map.insert(key, bytes);
462            }
463        }
464
465        let mut plans = Vec::with_capacity(field_infos.len());
466        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
467            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
468            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
469            let value_metas =
470                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
471
472            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
473            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
474            let row_metas =
475                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
476
477            if value_metas.len() != row_metas.len() {
478                return Err(Error::Internal(
479                    "gather_rows_multi: chunk count mismatch".into(),
480                ));
481            }
482
483            plans.push(FieldPlan {
484                dtype: dtype.clone(),
485                value_metas,
486                row_metas,
487                candidate_indices: Vec::new(),
488            });
489        }
490
491        Ok(MultiGatherContext::new(field_infos, plans))
492    }
493
494    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
495    ///
496    /// This path amortizes chunk fetch and decode costs across multiple calls by
497    /// retaining Arrow arrays and scratch state inside the provided context.
498    pub fn gather_rows_with_reusable_context(
499        &self,
500        ctx: &mut MultiGatherContext,
501        row_ids: &[u64],
502        policy: GatherNullPolicy,
503    ) -> Result<RecordBatch> {
504        if ctx.is_empty() {
505            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
506        }
507
508        if row_ids.is_empty() {
509            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
510            let mut fields = Vec::with_capacity(ctx.field_infos().len());
511            for (fid, dtype) in ctx.field_infos() {
512                arrays.push(new_empty_array(dtype));
513                let field_name = format!("field_{}", u64::from(*fid));
514                fields.push(Field::new(field_name, dtype.clone(), true));
515            }
516            let schema = Arc::new(Schema::new(fields));
517            return RecordBatch::try_new(schema, arrays)
518                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
519        }
520
521        let mut row_index = ctx.take_row_index();
522        let mut row_scratch = ctx.take_row_scratch();
523
524        let field_infos = ctx.field_infos().to_vec();
525        let mut chunk_keys = ctx.take_chunk_keys();
526
527        let result: Result<RecordBatch> = (|| {
528            let len = row_ids.len();
529            if row_scratch.len() < len {
530                row_scratch.resize(len, None);
531            }
532
533            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
534            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
535                Cow::Borrowed(row_ids)
536            } else {
537                let mut buf = row_ids.to_vec();
538                buf.sort_unstable();
539                Cow::Owned(buf)
540            };
541            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
542
543            let dense_base = if len == 0 {
544                None
545            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
546            {
547                Some(row_ids[0])
548            } else {
549                None
550            };
551
552            if dense_base.is_none() {
553                row_index.clear();
554                row_index.reserve(len);
555                for (idx, &row_id) in row_ids.iter().enumerate() {
556                    if row_index.insert(row_id, idx).is_some() {
557                        return Err(Error::Internal(
558                            "duplicate row_id in gather_rows_multi".into(),
559                        ));
560                    }
561                }
562            } else {
563                row_index.clear();
564            }
565
566            let row_locator = if let Some(base) = dense_base {
567                RowLocator::Dense { base }
568            } else {
569                RowLocator::Sparse { index: &row_index }
570            };
571
572            chunk_keys.clear();
573
574            {
575                let plans_mut = ctx.plans_mut();
576                for plan in plans_mut.iter_mut() {
577                    plan.candidate_indices.clear();
578                    for (idx, meta) in plan.row_metas.iter().enumerate() {
579                        if Self::chunk_intersects(sorted_row_ids, meta) {
580                            plan.candidate_indices.push(idx);
581                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
582                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
583                        }
584                    }
585                }
586            }
587
588            chunk_keys.sort_unstable();
589            chunk_keys.dedup();
590
591            {
592                let mut pending: Vec<BatchGet> = Vec::new();
593                {
594                    let cache = ctx.chunk_cache();
595                    for &key in &chunk_keys {
596                        if !cache.contains_key(&key) {
597                            pending.push(BatchGet::Raw { key });
598                        }
599                    }
600                }
601
602                if !pending.is_empty() {
603                    let chunk_results = self.pager.batch_get(&pending)?;
604                    let cache = ctx.chunk_cache_mut();
605                    for result in chunk_results {
606                        if let GetResult::Raw { key, bytes } = result {
607                            let array = deserialize_array(bytes)?;
608                            cache.insert(key, Arc::clone(&array));
609                        }
610                    }
611                }
612            }
613
614            let allow_missing = policy.allow_missing();
615
616            let mut outputs = Vec::with_capacity(ctx.plans().len());
617            for plan in ctx.plans() {
618                let array = match &plan.dtype {
619                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
620                        row_ids,
621                        row_locator,
622                        len,
623                        &plan.candidate_indices,
624                        plan,
625                        ctx.chunk_cache(),
626                        &mut row_scratch,
627                        allow_missing,
628                    ),
629                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
630                        row_ids,
631                        row_locator,
632                        len,
633                        &plan.candidate_indices,
634                        plan,
635                        ctx.chunk_cache(),
636                        &mut row_scratch,
637                        allow_missing,
638                    ),
639                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
640                        row_ids,
641                        row_locator,
642                        len,
643                        &plan.candidate_indices,
644                        plan,
645                        ctx.chunk_cache(),
646                        &mut row_scratch,
647                        allow_missing,
648                    ),
649                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
650                        row_ids,
651                        row_locator,
652                        len,
653                        &plan.candidate_indices,
654                        plan,
655                        ctx.chunk_cache(),
656                        &mut row_scratch,
657                        allow_missing,
658                    ),
659                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
660                        row_ids,
661                        row_locator,
662                        len,
663                        &plan.candidate_indices,
664                        plan,
665                        ctx.chunk_cache(),
666                        &mut row_scratch,
667                        allow_missing,
668                    ),
669                    DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
670                        row_locator,
671                        len,
672                        &plan.candidate_indices,
673                        plan,
674                        ctx.chunk_cache(),
675                        &mut row_scratch,
676                        allow_missing,
677                        &plan.dtype,
678                    ),
679                    other => with_integer_arrow_type!(
680                        other.clone(),
681                        |ArrowTy| {
682                            Self::gather_rows_from_chunks::<ArrowTy>(
683                                row_ids,
684                                row_locator,
685                                len,
686                                &plan.candidate_indices,
687                                plan,
688                                ctx.chunk_cache(),
689                                &mut row_scratch,
690                                allow_missing,
691                            )
692                        },
693                        Err(Error::Internal(format!(
694                            "gather_rows_multi: unsupported dtype {:?}",
695                            other
696                        ))),
697                    ),
698                }?;
699                outputs.push(array);
700            }
701
702            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
703                Self::filter_rows_with_non_null(outputs)?
704            } else {
705                outputs
706            };
707
708            let mut fields = Vec::with_capacity(field_infos.len());
709            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
710                let array = &outputs[idx];
711                let field_name = format!("field_{}", u64::from(*fid));
712                let nullable = match policy {
713                    GatherNullPolicy::IncludeNulls => true,
714                    _ => array.null_count() > 0,
715                };
716                fields.push(Field::new(field_name, dtype.clone(), nullable));
717            }
718
719            let schema = Arc::new(Schema::new(fields));
720            RecordBatch::try_new(schema, outputs)
721                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
722        })();
723
724        ctx.store_row_scratch(row_scratch);
725        ctx.store_row_index(row_index);
726        ctx.store_chunk_keys(chunk_keys);
727
728        result
729    }
730
731    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
732        let mut metas = Vec::new();
733        if head_page_pk == 0 {
734            return Ok(metas);
735        }
736        for meta in DescriptorIterator::new(pager, head_page_pk) {
737            let meta = meta?;
738            if meta.row_count > 0 {
739                metas.push(meta);
740            }
741        }
742        Ok(metas)
743    }
744
745    #[inline]
746    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
747        if sorted_row_ids.is_empty() || meta.row_count == 0 {
748            return false;
749        }
750        let min = meta.min_val_u64;
751        let max = meta.max_val_u64;
752        if min == 0 && max == 0 && meta.row_count > 0 {
753            return true;
754        }
755        if min > max {
756            return true;
757        }
758        let min_req = sorted_row_ids[0];
759        let max_req = *sorted_row_ids.last().unwrap();
760        if max < min_req || min > max_req {
761            return false;
762        }
763        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
764        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
765    }
766
767    fn gather_rows_single_shot_string<O>(
768        row_index: &FxHashMap<u64, usize>,
769        len: usize,
770        plan: &FieldPlan,
771        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
772        allow_missing: bool,
773    ) -> Result<ArrayRef>
774    where
775        O: OffsetSizeTrait,
776    {
777        shared_gather_rows_single_shot_string::<O>(
778            row_index,
779            len,
780            &plan.value_metas,
781            &plan.row_metas,
782            &plan.candidate_indices,
783            chunk_blobs,
784            allow_missing,
785        )
786    }
787
788    #[allow(clippy::too_many_arguments)]
789    fn gather_rows_single_shot_bool(
790        row_index: &FxHashMap<u64, usize>,
791        len: usize,
792        plan: &FieldPlan,
793        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
794        allow_missing: bool,
795    ) -> Result<ArrayRef> {
796        shared_gather_rows_single_shot_bool(
797            row_index,
798            len,
799            &plan.value_metas,
800            &plan.row_metas,
801            &plan.candidate_indices,
802            chunk_blobs,
803            allow_missing,
804        )
805    }
806
807    fn gather_rows_single_shot_struct(
808        row_index: &FxHashMap<u64, usize>,
809        len: usize,
810        plan: &FieldPlan,
811        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
812        allow_missing: bool,
813        dtype: &DataType,
814    ) -> Result<ArrayRef> {
815        shared_gather_rows_single_shot_struct(
816            row_index,
817            len,
818            &plan.value_metas,
819            &plan.row_metas,
820            &plan.candidate_indices,
821            chunk_blobs,
822            allow_missing,
823            dtype,
824        )
825    }
826
827    #[allow(clippy::too_many_arguments)]
828    fn gather_rows_single_shot<T>(
829        row_index: &FxHashMap<u64, usize>,
830        len: usize,
831        plan: &FieldPlan,
832        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
833        allow_missing: bool,
834    ) -> Result<ArrayRef>
835    where
836        T: ArrowPrimitiveType,
837    {
838        shared_gather_rows_single_shot::<T>(
839            row_index,
840            len,
841            &plan.value_metas,
842            &plan.row_metas,
843            &plan.candidate_indices,
844            chunk_blobs,
845            allow_missing,
846        )
847    }
848
849    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
850    fn gather_rows_from_chunks_string<O>(
851        row_ids: &[u64],
852        row_locator: RowLocator,
853        len: usize,
854        candidate_indices: &[usize],
855        plan: &FieldPlan,
856        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
857        row_scratch: &mut [Option<(usize, usize)>],
858        allow_missing: bool,
859    ) -> Result<ArrayRef>
860    where
861        O: OffsetSizeTrait,
862    {
863        shared_gather_rows_from_chunks_string::<O>(
864            row_ids,
865            row_locator,
866            len,
867            candidate_indices,
868            &plan.value_metas,
869            &plan.row_metas,
870            chunk_arrays,
871            row_scratch,
872            allow_missing,
873        )
874    }
875
876    fn gather_rows_single_shot_binary<O>(
877        row_index: &FxHashMap<u64, usize>,
878        len: usize,
879        plan: &FieldPlan,
880        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
881        allow_missing: bool,
882    ) -> Result<ArrayRef>
883    where
884        O: OffsetSizeTrait,
885    {
886        shared_gather_rows_single_shot_binary::<O>(
887            row_index,
888            len,
889            &plan.value_metas,
890            &plan.row_metas,
891            &plan.candidate_indices,
892            chunk_blobs,
893            allow_missing,
894        )
895    }
896
897    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
898    fn gather_rows_from_chunks_binary<O>(
899        row_ids: &[u64],
900        row_locator: RowLocator,
901        len: usize,
902        candidate_indices: &[usize],
903        plan: &FieldPlan,
904        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
905        row_scratch: &mut [Option<(usize, usize)>],
906        allow_missing: bool,
907    ) -> Result<ArrayRef>
908    where
909        O: OffsetSizeTrait,
910    {
911        shared_gather_rows_from_chunks_binary::<O>(
912            row_ids,
913            row_locator,
914            len,
915            candidate_indices,
916            &plan.value_metas,
917            &plan.row_metas,
918            chunk_arrays,
919            row_scratch,
920            allow_missing,
921        )
922    }
923
924    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
925    fn gather_rows_from_chunks_bool(
926        row_ids: &[u64],
927        row_locator: RowLocator,
928        len: usize,
929        candidate_indices: &[usize],
930        plan: &FieldPlan,
931        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
932        row_scratch: &mut [Option<(usize, usize)>],
933        allow_missing: bool,
934    ) -> Result<ArrayRef> {
935        shared_gather_rows_from_chunks_bool(
936            row_ids,
937            row_locator,
938            len,
939            candidate_indices,
940            &plan.value_metas,
941            &plan.row_metas,
942            chunk_arrays,
943            row_scratch,
944            allow_missing,
945        )
946    }
947
948    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
949    fn gather_rows_from_chunks_struct(
950        row_locator: RowLocator,
951        len: usize,
952        candidate_indices: &[usize],
953        plan: &FieldPlan,
954        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
955        row_scratch: &mut [Option<(usize, usize)>],
956        allow_missing: bool,
957        dtype: &DataType,
958    ) -> Result<ArrayRef> {
959        shared_gather_rows_from_chunks_struct(
960            row_locator,
961            len,
962            candidate_indices,
963            &plan.value_metas,
964            &plan.row_metas,
965            chunk_arrays,
966            row_scratch,
967            allow_missing,
968            dtype,
969        )
970    }
971
972    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and keeps type monomorphization straightforward.
973    fn gather_rows_from_chunks<T>(
974        row_ids: &[u64],
975        row_locator: RowLocator,
976        len: usize,
977        candidate_indices: &[usize],
978        plan: &FieldPlan,
979        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
980        row_scratch: &mut [Option<(usize, usize)>],
981        allow_missing: bool,
982    ) -> Result<ArrayRef>
983    where
984        T: ArrowPrimitiveType,
985    {
986        shared_gather_rows_from_chunks::<T>(
987            row_ids,
988            row_locator,
989            len,
990            candidate_indices,
991            &plan.value_metas,
992            &plan.row_metas,
993            chunk_arrays,
994            row_scratch,
995            allow_missing,
996        )
997    }
998
999    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1000        shared_filter_rows_with_non_null(columns)
1001    }
1002}