llkv_column_map/store/
projection.rs

1//! Projection planners for building Arrow batches from column chunks.
2//!
3//! The projection subsystem wires descriptor metadata, chunk loading helpers,
4//! and gather routines to materialize user-facing result sets. It centralizes
5//! null-handling policies and keeps the hot paths monomorphized per Arrow type.
6
7use super::*;
8use crate::gather::{
9    RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10    gather_rows_from_chunks as shared_gather_rows_from_chunks,
11    gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12    gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13    gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14    gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15    gather_rows_single_shot as shared_gather_rows_single_shot,
16    gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17    gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18    gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19    gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28    pager::{BatchGet, GetResult, Pager},
29    serialization::deserialize_array,
30    types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39    /// Any missing row ID results in an error.
40    ErrorOnMissing,
41    /// Missing rows surface as nulls in the result arrays.
42    IncludeNulls,
43    /// Missing rows and rows where all projected columns are null are dropped from the output.
44    DropNulls,
45}
46
47#[derive(Clone, Debug, Default, Eq, PartialEq)]
48pub struct Projection {
49    pub logical_field_id: LogicalFieldId,
50    pub alias: Option<String>,
51}
52
53impl Projection {
54    pub fn new(logical_field_id: LogicalFieldId) -> Self {
55        Self {
56            logical_field_id,
57            alias: None,
58        }
59    }
60
61    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
62        Self {
63            logical_field_id,
64            alias: Some(alias.into()),
65        }
66    }
67}
68
69impl From<LogicalFieldId> for Projection {
70    fn from(logical_field_id: LogicalFieldId) -> Self {
71        Projection::new(logical_field_id)
72    }
73}
74
75impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
76    fn from(value: (LogicalFieldId, S)) -> Self {
77        Projection::with_alias(value.0, value.1)
78    }
79}
80
81impl GatherNullPolicy {
82    #[inline]
83    fn allow_missing(self) -> bool {
84        !matches!(self, Self::ErrorOnMissing)
85    }
86}
87
88pub struct MultiGatherContext {
89    field_infos: Vec<(LogicalFieldId, DataType)>,
90    plans: Vec<FieldPlan>,
91    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
92    row_index: FxHashMap<u64, usize>,
93    row_scratch: Vec<Option<(usize, usize)>>,
94    chunk_keys: Vec<PhysicalKey>,
95}
96
97impl MultiGatherContext {
98    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
99        Self {
100            chunk_cache: FxHashMap::default(),
101            row_index: FxHashMap::default(),
102            row_scratch: Vec::new(),
103            chunk_keys: Vec::new(),
104            field_infos,
105            plans,
106        }
107    }
108
109    #[inline]
110    pub fn is_empty(&self) -> bool {
111        self.plans.is_empty()
112    }
113
114    #[inline]
115    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
116        &self.field_infos
117    }
118
119    #[inline]
120    fn plans(&self) -> &[FieldPlan] {
121        &self.plans
122    }
123
124    #[inline]
125    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
126        &self.chunk_cache
127    }
128
129    #[inline]
130    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
131        &mut self.chunk_cache
132    }
133
134    #[inline]
135    fn plans_mut(&mut self) -> &mut [FieldPlan] {
136        &mut self.plans
137    }
138
139    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
140        std::mem::take(&mut self.chunk_keys)
141    }
142
143    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
144        self.chunk_keys = keys;
145    }
146
147    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
148        std::mem::take(&mut self.row_index)
149    }
150
151    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
152        self.row_index = row_index;
153    }
154
155    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
156        std::mem::take(&mut self.row_scratch)
157    }
158
159    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
160        self.row_scratch = scratch;
161    }
162
163    pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
164        let first_plan = self.plans.first()?;
165        let mut chunk_idx = None;
166        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
167            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
168                chunk_idx = Some(idx);
169                break;
170            }
171        }
172        if chunk_idx.is_none() {
173            let total_chunks = first_plan.row_metas.len();
174            'outer: for idx in 0..total_chunks {
175                for plan in &self.plans {
176                    let meta = &plan.row_metas[idx];
177                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
178                        chunk_idx = Some(idx);
179                        break 'outer;
180                    }
181                }
182            }
183        }
184        let idx = chunk_idx?;
185
186        let mut span_min = u64::MAX;
187        let mut span_max = 0u64;
188        for plan in &self.plans {
189            let meta = plan.row_metas.get(idx)?;
190            span_min = span_min.min(meta.min_val_u64);
191            span_max = span_max.max(meta.max_val_u64);
192        }
193
194        if span_min > span_max {
195            return None;
196        }
197
198        Some((idx, span_min, span_max))
199    }
200}
201
202#[derive(Clone, Debug)]
203struct FieldPlan {
204    dtype: DataType,
205    value_metas: Vec<ChunkMetadata>,
206    row_metas: Vec<ChunkMetadata>,
207    candidate_indices: Vec<usize>,
208}
209
210impl<P> ColumnStore<P>
211where
212    P: Pager<Blob = EntryHandle> + Send + Sync,
213{
214    /// Gathers multiple columns using a configurable null-handling policy.
215    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
216    /// projected columns are null or missing are removed from the
217    /// resulting batch.
218    pub fn gather_rows(
219        &self,
220        field_ids: &[LogicalFieldId],
221        row_ids: &[u64],
222        policy: GatherNullPolicy,
223    ) -> Result<RecordBatch> {
224        let mut ctx = self.prepare_gather_context(field_ids)?;
225        self.execute_gather_single_pass(&mut ctx, row_ids, policy)
226    }
227
228    /// Executes a one-off gather using a freshly prepared context.
229    ///
230    /// This path reuses the planning metadata but fetches and decodes
231    /// required chunks for this call only, avoiding the reusable caches
232    /// maintained by [`Self::gather_rows_with_reusable_context`].
233    fn execute_gather_single_pass(
234        &self,
235        ctx: &mut MultiGatherContext,
236        row_ids: &[u64],
237        policy: GatherNullPolicy,
238    ) -> Result<RecordBatch> {
239        if ctx.is_empty() {
240            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
241        }
242
243        let field_infos = ctx.field_infos().to_vec();
244
245        if row_ids.is_empty() {
246            let mut arrays = Vec::with_capacity(field_infos.len());
247            let mut fields = Vec::with_capacity(field_infos.len());
248            for (fid, dtype) in &field_infos {
249                arrays.push(new_empty_array(dtype));
250                let field_name = format!("field_{}", u64::from(*fid));
251                fields.push(Field::new(field_name, dtype.clone(), true));
252            }
253            let schema = Arc::new(Schema::new(fields));
254            return RecordBatch::try_new(schema, arrays)
255                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
256        }
257
258        let mut row_index: FxHashMap<u64, usize> =
259            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
260        for (idx, &row_id) in row_ids.iter().enumerate() {
261            if row_index.insert(row_id, idx).is_some() {
262                return Err(Error::Internal(
263                    "duplicate row_id in gather_rows_multi".into(),
264                ));
265            }
266        }
267
268        let mut sorted_row_ids = row_ids.to_vec();
269        sorted_row_ids.sort_unstable();
270
271        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
272        {
273            let plans_mut = ctx.plans_mut();
274            for plan in plans_mut.iter_mut() {
275                plan.candidate_indices.clear();
276                for (idx, meta) in plan.row_metas.iter().enumerate() {
277                    if Self::chunk_intersects(&sorted_row_ids, meta) {
278                        plan.candidate_indices.push(idx);
279                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
280                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
281                    }
282                }
283            }
284        }
285
286        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
287        for &key in &chunk_keys {
288            chunk_requests.push(BatchGet::Raw { key });
289        }
290
291        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
292            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
293        if !chunk_requests.is_empty() {
294            let chunk_results = self.pager.batch_get(&chunk_requests)?;
295            for result in chunk_results {
296                if let GetResult::Raw { key, bytes } = result {
297                    chunk_map.insert(key, bytes);
298                }
299            }
300        }
301
302        let allow_missing = policy.allow_missing();
303
304        let mut outputs = Vec::with_capacity(ctx.plans().len());
305        for plan in ctx.plans() {
306            let array = match &plan.dtype {
307                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
308                    &row_index,
309                    row_ids.len(),
310                    plan,
311                    &mut chunk_map,
312                    allow_missing,
313                ),
314                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
315                    &row_index,
316                    row_ids.len(),
317                    plan,
318                    &mut chunk_map,
319                    allow_missing,
320                ),
321                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
322                    &row_index,
323                    row_ids.len(),
324                    plan,
325                    &mut chunk_map,
326                    allow_missing,
327                ),
328                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
329                    &row_index,
330                    row_ids.len(),
331                    plan,
332                    &mut chunk_map,
333                    allow_missing,
334                ),
335                DataType::Boolean => Self::gather_rows_single_shot_bool(
336                    &row_index,
337                    row_ids.len(),
338                    plan,
339                    &mut chunk_map,
340                    allow_missing,
341                ),
342                DataType::Struct(_) => Self::gather_rows_single_shot_struct(
343                    &row_index,
344                    row_ids.len(),
345                    plan,
346                    &mut chunk_map,
347                    allow_missing,
348                    &plan.dtype,
349                ),
350                other => with_integer_arrow_type!(
351                    other.clone(),
352                    |ArrowTy| {
353                        Self::gather_rows_single_shot::<ArrowTy>(
354                            &row_index,
355                            row_ids.len(),
356                            plan,
357                            &mut chunk_map,
358                            allow_missing,
359                        )
360                    },
361                    Err(Error::Internal(format!(
362                        "gather_rows_multi: unsupported dtype {:?}",
363                        other
364                    ))),
365                ),
366            }?;
367            outputs.push(array);
368        }
369
370        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
371            Self::filter_rows_with_non_null(outputs)?
372        } else {
373            outputs
374        };
375
376        let mut fields = Vec::with_capacity(field_infos.len());
377        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
378            let array = &outputs[idx];
379            let field_name = format!("field_{}", u64::from(*fid));
380            let nullable = match policy {
381                GatherNullPolicy::IncludeNulls => true,
382                _ => array.null_count() > 0,
383            };
384            fields.push(Field::new(field_name, dtype.clone(), nullable));
385        }
386
387        let schema = Arc::new(Schema::new(fields));
388        RecordBatch::try_new(schema, outputs)
389            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
390    }
391
392    pub fn prepare_gather_context(
393        &self,
394        field_ids: &[LogicalFieldId],
395    ) -> Result<MultiGatherContext> {
396        let mut field_infos = Vec::with_capacity(field_ids.len());
397        for &fid in field_ids {
398            field_infos.push((fid, self.data_type(fid)?));
399        }
400
401        if field_infos.is_empty() {
402            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
403        }
404
405        let catalog = self.catalog.read().unwrap();
406        let mut key_pairs = Vec::with_capacity(field_infos.len());
407        for (fid, _) in &field_infos {
408            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
409            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
410            key_pairs.push((value_pk, row_pk));
411        }
412        drop(catalog);
413
414        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
415        for (value_pk, row_pk) in &key_pairs {
416            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
417            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
418        }
419        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
420        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
421        for result in descriptor_results {
422            if let GetResult::Raw { key, bytes } = result {
423                descriptor_map.insert(key, bytes);
424            }
425        }
426
427        let mut plans = Vec::with_capacity(field_infos.len());
428        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
429            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
430            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
431            let value_metas =
432                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
433
434            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
435            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
436            let row_metas =
437                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
438
439            if value_metas.len() != row_metas.len() {
440                return Err(Error::Internal(
441                    "gather_rows_multi: chunk count mismatch".into(),
442                ));
443            }
444
445            plans.push(FieldPlan {
446                dtype: dtype.clone(),
447                value_metas,
448                row_metas,
449                candidate_indices: Vec::new(),
450            });
451        }
452
453        Ok(MultiGatherContext::new(field_infos, plans))
454    }
455
456    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
457    ///
458    /// This path amortizes chunk fetch and decode costs across multiple calls by
459    /// retaining Arrow arrays and scratch state inside the provided context.
460    pub fn gather_rows_with_reusable_context(
461        &self,
462        ctx: &mut MultiGatherContext,
463        row_ids: &[u64],
464        policy: GatherNullPolicy,
465    ) -> Result<RecordBatch> {
466        if ctx.is_empty() {
467            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
468        }
469
470        if row_ids.is_empty() {
471            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
472            let mut fields = Vec::with_capacity(ctx.field_infos().len());
473            for (fid, dtype) in ctx.field_infos() {
474                arrays.push(new_empty_array(dtype));
475                let field_name = format!("field_{}", u64::from(*fid));
476                fields.push(Field::new(field_name, dtype.clone(), true));
477            }
478            let schema = Arc::new(Schema::new(fields));
479            return RecordBatch::try_new(schema, arrays)
480                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
481        }
482
483        let mut row_index = ctx.take_row_index();
484        let mut row_scratch = ctx.take_row_scratch();
485
486        let field_infos = ctx.field_infos().to_vec();
487        let mut chunk_keys = ctx.take_chunk_keys();
488
489        let result: Result<RecordBatch> = (|| {
490            let len = row_ids.len();
491            if row_scratch.len() < len {
492                row_scratch.resize(len, None);
493            }
494
495            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
496            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
497                Cow::Borrowed(row_ids)
498            } else {
499                let mut buf = row_ids.to_vec();
500                buf.sort_unstable();
501                Cow::Owned(buf)
502            };
503            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
504
505            let dense_base = if len == 0 {
506                None
507            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
508            {
509                Some(row_ids[0])
510            } else {
511                None
512            };
513
514            if dense_base.is_none() {
515                row_index.clear();
516                row_index.reserve(len);
517                for (idx, &row_id) in row_ids.iter().enumerate() {
518                    if row_index.insert(row_id, idx).is_some() {
519                        return Err(Error::Internal(
520                            "duplicate row_id in gather_rows_multi".into(),
521                        ));
522                    }
523                }
524            } else {
525                row_index.clear();
526            }
527
528            let row_locator = if let Some(base) = dense_base {
529                RowLocator::Dense { base }
530            } else {
531                RowLocator::Sparse { index: &row_index }
532            };
533
534            chunk_keys.clear();
535
536            {
537                let plans_mut = ctx.plans_mut();
538                for plan in plans_mut.iter_mut() {
539                    plan.candidate_indices.clear();
540                    for (idx, meta) in plan.row_metas.iter().enumerate() {
541                        if Self::chunk_intersects(sorted_row_ids, meta) {
542                            plan.candidate_indices.push(idx);
543                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
544                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
545                        }
546                    }
547                }
548            }
549
550            chunk_keys.sort_unstable();
551            chunk_keys.dedup();
552
553            {
554                let mut pending: Vec<BatchGet> = Vec::new();
555                {
556                    let cache = ctx.chunk_cache();
557                    for &key in &chunk_keys {
558                        if !cache.contains_key(&key) {
559                            pending.push(BatchGet::Raw { key });
560                        }
561                    }
562                }
563
564                if !pending.is_empty() {
565                    let chunk_results = self.pager.batch_get(&pending)?;
566                    let cache = ctx.chunk_cache_mut();
567                    for result in chunk_results {
568                        if let GetResult::Raw { key, bytes } = result {
569                            let array = deserialize_array(bytes)?;
570                            cache.insert(key, Arc::clone(&array));
571                        }
572                    }
573                }
574            }
575
576            let allow_missing = policy.allow_missing();
577
578            let mut outputs = Vec::with_capacity(ctx.plans().len());
579            for plan in ctx.plans() {
580                let array = match &plan.dtype {
581                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
582                        row_ids,
583                        row_locator,
584                        len,
585                        &plan.candidate_indices,
586                        plan,
587                        ctx.chunk_cache(),
588                        &mut row_scratch,
589                        allow_missing,
590                    ),
591                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
592                        row_ids,
593                        row_locator,
594                        len,
595                        &plan.candidate_indices,
596                        plan,
597                        ctx.chunk_cache(),
598                        &mut row_scratch,
599                        allow_missing,
600                    ),
601                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
602                        row_ids,
603                        row_locator,
604                        len,
605                        &plan.candidate_indices,
606                        plan,
607                        ctx.chunk_cache(),
608                        &mut row_scratch,
609                        allow_missing,
610                    ),
611                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
612                        row_ids,
613                        row_locator,
614                        len,
615                        &plan.candidate_indices,
616                        plan,
617                        ctx.chunk_cache(),
618                        &mut row_scratch,
619                        allow_missing,
620                    ),
621                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
622                        row_ids,
623                        row_locator,
624                        len,
625                        &plan.candidate_indices,
626                        plan,
627                        ctx.chunk_cache(),
628                        &mut row_scratch,
629                        allow_missing,
630                    ),
631                    DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
632                        row_locator,
633                        len,
634                        &plan.candidate_indices,
635                        plan,
636                        ctx.chunk_cache(),
637                        &mut row_scratch,
638                        allow_missing,
639                        &plan.dtype,
640                    ),
641                    other => with_integer_arrow_type!(
642                        other.clone(),
643                        |ArrowTy| {
644                            Self::gather_rows_from_chunks::<ArrowTy>(
645                                row_ids,
646                                row_locator,
647                                len,
648                                &plan.candidate_indices,
649                                plan,
650                                ctx.chunk_cache(),
651                                &mut row_scratch,
652                                allow_missing,
653                            )
654                        },
655                        Err(Error::Internal(format!(
656                            "gather_rows_multi: unsupported dtype {:?}",
657                            other
658                        ))),
659                    ),
660                }?;
661                outputs.push(array);
662            }
663
664            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
665                Self::filter_rows_with_non_null(outputs)?
666            } else {
667                outputs
668            };
669
670            let mut fields = Vec::with_capacity(field_infos.len());
671            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
672                let array = &outputs[idx];
673                let field_name = format!("field_{}", u64::from(*fid));
674                let nullable = match policy {
675                    GatherNullPolicy::IncludeNulls => true,
676                    _ => array.null_count() > 0,
677                };
678                fields.push(Field::new(field_name, dtype.clone(), nullable));
679            }
680
681            let schema = Arc::new(Schema::new(fields));
682            RecordBatch::try_new(schema, outputs)
683                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
684        })();
685
686        ctx.store_row_scratch(row_scratch);
687        ctx.store_row_index(row_index);
688        ctx.store_chunk_keys(chunk_keys);
689
690        result
691    }
692
693    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
694        let mut metas = Vec::new();
695        if head_page_pk == 0 {
696            return Ok(metas);
697        }
698        for meta in DescriptorIterator::new(pager, head_page_pk) {
699            let meta = meta?;
700            if meta.row_count > 0 {
701                metas.push(meta);
702            }
703        }
704        Ok(metas)
705    }
706
707    #[inline]
708    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
709        if sorted_row_ids.is_empty() || meta.row_count == 0 {
710            return false;
711        }
712        let min = meta.min_val_u64;
713        let max = meta.max_val_u64;
714        if min == 0 && max == 0 && meta.row_count > 0 {
715            return true;
716        }
717        if min > max {
718            return true;
719        }
720        let min_req = sorted_row_ids[0];
721        let max_req = *sorted_row_ids.last().unwrap();
722        if max < min_req || min > max_req {
723            return false;
724        }
725        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
726        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
727    }
728
729    fn gather_rows_single_shot_string<O>(
730        row_index: &FxHashMap<u64, usize>,
731        len: usize,
732        plan: &FieldPlan,
733        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
734        allow_missing: bool,
735    ) -> Result<ArrayRef>
736    where
737        O: OffsetSizeTrait,
738    {
739        shared_gather_rows_single_shot_string::<O>(
740            row_index,
741            len,
742            &plan.value_metas,
743            &plan.row_metas,
744            &plan.candidate_indices,
745            chunk_blobs,
746            allow_missing,
747        )
748    }
749
750    #[allow(clippy::too_many_arguments)]
751    fn gather_rows_single_shot_bool(
752        row_index: &FxHashMap<u64, usize>,
753        len: usize,
754        plan: &FieldPlan,
755        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
756        allow_missing: bool,
757    ) -> Result<ArrayRef> {
758        shared_gather_rows_single_shot_bool(
759            row_index,
760            len,
761            &plan.value_metas,
762            &plan.row_metas,
763            &plan.candidate_indices,
764            chunk_blobs,
765            allow_missing,
766        )
767    }
768
769    fn gather_rows_single_shot_struct(
770        row_index: &FxHashMap<u64, usize>,
771        len: usize,
772        plan: &FieldPlan,
773        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
774        allow_missing: bool,
775        dtype: &DataType,
776    ) -> Result<ArrayRef> {
777        shared_gather_rows_single_shot_struct(
778            row_index,
779            len,
780            &plan.value_metas,
781            &plan.row_metas,
782            &plan.candidate_indices,
783            chunk_blobs,
784            allow_missing,
785            dtype,
786        )
787    }
788
789    #[allow(clippy::too_many_arguments)]
790    fn gather_rows_single_shot<T>(
791        row_index: &FxHashMap<u64, usize>,
792        len: usize,
793        plan: &FieldPlan,
794        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
795        allow_missing: bool,
796    ) -> Result<ArrayRef>
797    where
798        T: ArrowPrimitiveType,
799    {
800        shared_gather_rows_single_shot::<T>(
801            row_index,
802            len,
803            &plan.value_metas,
804            &plan.row_metas,
805            &plan.candidate_indices,
806            chunk_blobs,
807            allow_missing,
808        )
809    }
810
811    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
812    fn gather_rows_from_chunks_string<O>(
813        row_ids: &[u64],
814        row_locator: RowLocator,
815        len: usize,
816        candidate_indices: &[usize],
817        plan: &FieldPlan,
818        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
819        row_scratch: &mut [Option<(usize, usize)>],
820        allow_missing: bool,
821    ) -> Result<ArrayRef>
822    where
823        O: OffsetSizeTrait,
824    {
825        shared_gather_rows_from_chunks_string::<O>(
826            row_ids,
827            row_locator,
828            len,
829            candidate_indices,
830            &plan.value_metas,
831            &plan.row_metas,
832            chunk_arrays,
833            row_scratch,
834            allow_missing,
835        )
836    }
837
838    fn gather_rows_single_shot_binary<O>(
839        row_index: &FxHashMap<u64, usize>,
840        len: usize,
841        plan: &FieldPlan,
842        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
843        allow_missing: bool,
844    ) -> Result<ArrayRef>
845    where
846        O: OffsetSizeTrait,
847    {
848        shared_gather_rows_single_shot_binary::<O>(
849            row_index,
850            len,
851            &plan.value_metas,
852            &plan.row_metas,
853            &plan.candidate_indices,
854            chunk_blobs,
855            allow_missing,
856        )
857    }
858
859    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
860    fn gather_rows_from_chunks_binary<O>(
861        row_ids: &[u64],
862        row_locator: RowLocator,
863        len: usize,
864        candidate_indices: &[usize],
865        plan: &FieldPlan,
866        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
867        row_scratch: &mut [Option<(usize, usize)>],
868        allow_missing: bool,
869    ) -> Result<ArrayRef>
870    where
871        O: OffsetSizeTrait,
872    {
873        shared_gather_rows_from_chunks_binary::<O>(
874            row_ids,
875            row_locator,
876            len,
877            candidate_indices,
878            &plan.value_metas,
879            &plan.row_metas,
880            chunk_arrays,
881            row_scratch,
882            allow_missing,
883        )
884    }
885
886    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
887    fn gather_rows_from_chunks_bool(
888        row_ids: &[u64],
889        row_locator: RowLocator,
890        len: usize,
891        candidate_indices: &[usize],
892        plan: &FieldPlan,
893        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
894        row_scratch: &mut [Option<(usize, usize)>],
895        allow_missing: bool,
896    ) -> Result<ArrayRef> {
897        shared_gather_rows_from_chunks_bool(
898            row_ids,
899            row_locator,
900            len,
901            candidate_indices,
902            &plan.value_metas,
903            &plan.row_metas,
904            chunk_arrays,
905            row_scratch,
906            allow_missing,
907        )
908    }
909
910    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
911    fn gather_rows_from_chunks_struct(
912        row_locator: RowLocator,
913        len: usize,
914        candidate_indices: &[usize],
915        plan: &FieldPlan,
916        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
917        row_scratch: &mut [Option<(usize, usize)>],
918        allow_missing: bool,
919        dtype: &DataType,
920    ) -> Result<ArrayRef> {
921        shared_gather_rows_from_chunks_struct(
922            row_locator,
923            len,
924            candidate_indices,
925            &plan.value_metas,
926            &plan.row_metas,
927            chunk_arrays,
928            row_scratch,
929            allow_missing,
930            dtype,
931        )
932    }
933
934    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and keeps type monomorphization straightforward.
935    fn gather_rows_from_chunks<T>(
936        row_ids: &[u64],
937        row_locator: RowLocator,
938        len: usize,
939        candidate_indices: &[usize],
940        plan: &FieldPlan,
941        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
942        row_scratch: &mut [Option<(usize, usize)>],
943        allow_missing: bool,
944    ) -> Result<ArrayRef>
945    where
946        T: ArrowPrimitiveType,
947    {
948        shared_gather_rows_from_chunks::<T>(
949            row_ids,
950            row_locator,
951            len,
952            candidate_indices,
953            &plan.value_metas,
954            &plan.row_metas,
955            chunk_arrays,
956            row_scratch,
957            allow_missing,
958        )
959    }
960
961    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
962        shared_filter_rows_with_non_null(columns)
963    }
964}