llkv_column_map/store/
projection.rs

1use super::*;
2use crate::gather::{
3    RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
4    gather_rows_from_chunks as shared_gather_rows_from_chunks,
5    gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
6    gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
7    gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
8    gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
9    gather_rows_single_shot as shared_gather_rows_single_shot,
10    gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
11    gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
12    gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
13    gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
14};
15use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
16use crate::types::{LogicalFieldId, RowId};
17use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
18use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
19use arrow::record_batch::RecordBatch;
20use llkv_result::{Error, Result};
21use llkv_storage::{
22    pager::{BatchGet, GetResult, Pager},
23    serialization::deserialize_array,
24    types::PhysicalKey,
25};
26use rustc_hash::{FxHashMap, FxHashSet};
27use simd_r_drive_entry_handle::EntryHandle;
28use std::borrow::Cow;
29use std::sync::Arc;
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub enum GatherNullPolicy {
33    /// Any missing row ID results in an error.
34    ErrorOnMissing,
35    /// Missing rows surface as nulls in the result arrays.
36    IncludeNulls,
37    /// Missing rows and rows where all projected columns are null are dropped from the output.
38    DropNulls,
39}
40
41#[derive(Clone, Debug, Default, Eq, PartialEq)]
42pub struct Projection {
43    pub logical_field_id: LogicalFieldId,
44    pub alias: Option<String>,
45}
46
47impl Projection {
48    pub fn new(logical_field_id: LogicalFieldId) -> Self {
49        Self {
50            logical_field_id,
51            alias: None,
52        }
53    }
54
55    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
56        Self {
57            logical_field_id,
58            alias: Some(alias.into()),
59        }
60    }
61}
62
63impl From<LogicalFieldId> for Projection {
64    fn from(logical_field_id: LogicalFieldId) -> Self {
65        Projection::new(logical_field_id)
66    }
67}
68
69impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
70    fn from(value: (LogicalFieldId, S)) -> Self {
71        Projection::with_alias(value.0, value.1)
72    }
73}
74
75impl GatherNullPolicy {
76    #[inline]
77    fn allow_missing(self) -> bool {
78        !matches!(self, Self::ErrorOnMissing)
79    }
80}
81
82pub struct MultiGatherContext {
83    field_infos: Vec<(LogicalFieldId, DataType)>,
84    plans: Vec<FieldPlan>,
85    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
86    row_index: FxHashMap<u64, usize>,
87    row_scratch: Vec<Option<(usize, usize)>>,
88    chunk_keys: Vec<PhysicalKey>,
89}
90
91impl MultiGatherContext {
92    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
93        Self {
94            chunk_cache: FxHashMap::default(),
95            row_index: FxHashMap::default(),
96            row_scratch: Vec::new(),
97            chunk_keys: Vec::new(),
98            field_infos,
99            plans,
100        }
101    }
102
103    #[inline]
104    pub fn is_empty(&self) -> bool {
105        self.plans.is_empty()
106    }
107
108    #[inline]
109    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
110        &self.field_infos
111    }
112
113    #[inline]
114    fn plans(&self) -> &[FieldPlan] {
115        &self.plans
116    }
117
118    #[inline]
119    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
120        &self.chunk_cache
121    }
122
123    #[inline]
124    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
125        &mut self.chunk_cache
126    }
127
128    #[inline]
129    fn plans_mut(&mut self) -> &mut [FieldPlan] {
130        &mut self.plans
131    }
132
133    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
134        std::mem::take(&mut self.chunk_keys)
135    }
136
137    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
138        self.chunk_keys = keys;
139    }
140
141    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
142        std::mem::take(&mut self.row_index)
143    }
144
145    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
146        self.row_index = row_index;
147    }
148
149    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
150        std::mem::take(&mut self.row_scratch)
151    }
152
153    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
154        self.row_scratch = scratch;
155    }
156
157    pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
158        let first_plan = self.plans.first()?;
159        let mut chunk_idx = None;
160        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
161            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
162                chunk_idx = Some(idx);
163                break;
164            }
165        }
166        if chunk_idx.is_none() {
167            let total_chunks = first_plan.row_metas.len();
168            'outer: for idx in 0..total_chunks {
169                for plan in &self.plans {
170                    let meta = &plan.row_metas[idx];
171                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
172                        chunk_idx = Some(idx);
173                        break 'outer;
174                    }
175                }
176            }
177        }
178        let idx = chunk_idx?;
179
180        let mut span_min = u64::MAX;
181        let mut span_max = 0u64;
182        for plan in &self.plans {
183            let meta = plan.row_metas.get(idx)?;
184            span_min = span_min.min(meta.min_val_u64);
185            span_max = span_max.max(meta.max_val_u64);
186        }
187
188        if span_min > span_max {
189            return None;
190        }
191
192        Some((idx, span_min, span_max))
193    }
194}
195
196#[derive(Clone, Debug)]
197struct FieldPlan {
198    dtype: DataType,
199    value_metas: Vec<ChunkMetadata>,
200    row_metas: Vec<ChunkMetadata>,
201    candidate_indices: Vec<usize>,
202}
203
204impl<P> ColumnStore<P>
205where
206    P: Pager<Blob = EntryHandle> + Send + Sync,
207{
208    /// Gathers multiple columns using a configurable null-handling policy.
209    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
210    /// projected columns are null or missing are removed from the
211    /// resulting batch.
212    pub fn gather_rows(
213        &self,
214        field_ids: &[LogicalFieldId],
215        row_ids: &[u64],
216        policy: GatherNullPolicy,
217    ) -> Result<RecordBatch> {
218        let mut ctx = self.prepare_gather_context(field_ids)?;
219        self.execute_gather_single_pass(&mut ctx, row_ids, policy)
220    }
221
222    /// Executes a one-off gather using a freshly prepared context.
223    ///
224    /// This path reuses the planning metadata but fetches and decodes
225    /// required chunks for this call only, avoiding the reusable caches
226    /// maintained by [`Self::gather_rows_with_reusable_context`].
227    fn execute_gather_single_pass(
228        &self,
229        ctx: &mut MultiGatherContext,
230        row_ids: &[u64],
231        policy: GatherNullPolicy,
232    ) -> Result<RecordBatch> {
233        if ctx.is_empty() {
234            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
235        }
236
237        let field_infos = ctx.field_infos().to_vec();
238
239        if row_ids.is_empty() {
240            let mut arrays = Vec::with_capacity(field_infos.len());
241            let mut fields = Vec::with_capacity(field_infos.len());
242            for (fid, dtype) in &field_infos {
243                arrays.push(new_empty_array(dtype));
244                let field_name = format!("field_{}", u64::from(*fid));
245                fields.push(Field::new(field_name, dtype.clone(), true));
246            }
247            let schema = Arc::new(Schema::new(fields));
248            return RecordBatch::try_new(schema, arrays)
249                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
250        }
251
252        let mut row_index: FxHashMap<u64, usize> =
253            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
254        for (idx, &row_id) in row_ids.iter().enumerate() {
255            if row_index.insert(row_id, idx).is_some() {
256                return Err(Error::Internal(
257                    "duplicate row_id in gather_rows_multi".into(),
258                ));
259            }
260        }
261
262        let mut sorted_row_ids = row_ids.to_vec();
263        sorted_row_ids.sort_unstable();
264
265        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
266        {
267            let plans_mut = ctx.plans_mut();
268            for plan in plans_mut.iter_mut() {
269                plan.candidate_indices.clear();
270                for (idx, meta) in plan.row_metas.iter().enumerate() {
271                    if Self::chunk_intersects(&sorted_row_ids, meta) {
272                        plan.candidate_indices.push(idx);
273                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
274                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
275                    }
276                }
277            }
278        }
279
280        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
281        for &key in &chunk_keys {
282            chunk_requests.push(BatchGet::Raw { key });
283        }
284
285        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
286            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
287        if !chunk_requests.is_empty() {
288            let chunk_results = self.pager.batch_get(&chunk_requests)?;
289            for result in chunk_results {
290                if let GetResult::Raw { key, bytes } = result {
291                    chunk_map.insert(key, bytes);
292                }
293            }
294        }
295
296        let allow_missing = policy.allow_missing();
297
298        let mut outputs = Vec::with_capacity(ctx.plans().len());
299        for plan in ctx.plans() {
300            let array = match &plan.dtype {
301                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
302                    &row_index,
303                    row_ids.len(),
304                    plan,
305                    &mut chunk_map,
306                    allow_missing,
307                ),
308                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
309                    &row_index,
310                    row_ids.len(),
311                    plan,
312                    &mut chunk_map,
313                    allow_missing,
314                ),
315                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
316                    &row_index,
317                    row_ids.len(),
318                    plan,
319                    &mut chunk_map,
320                    allow_missing,
321                ),
322                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
323                    &row_index,
324                    row_ids.len(),
325                    plan,
326                    &mut chunk_map,
327                    allow_missing,
328                ),
329                DataType::Boolean => Self::gather_rows_single_shot_bool(
330                    &row_index,
331                    row_ids.len(),
332                    plan,
333                    &mut chunk_map,
334                    allow_missing,
335                ),
336                DataType::Struct(_) => Self::gather_rows_single_shot_struct(
337                    &row_index,
338                    row_ids.len(),
339                    plan,
340                    &mut chunk_map,
341                    allow_missing,
342                    &plan.dtype,
343                ),
344                other => with_integer_arrow_type!(
345                    other.clone(),
346                    |ArrowTy| {
347                        Self::gather_rows_single_shot::<ArrowTy>(
348                            &row_index,
349                            row_ids.len(),
350                            plan,
351                            &mut chunk_map,
352                            allow_missing,
353                        )
354                    },
355                    Err(Error::Internal(format!(
356                        "gather_rows_multi: unsupported dtype {:?}",
357                        other
358                    ))),
359                ),
360            }?;
361            outputs.push(array);
362        }
363
364        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
365            Self::filter_rows_with_non_null(outputs)?
366        } else {
367            outputs
368        };
369
370        let mut fields = Vec::with_capacity(field_infos.len());
371        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
372            let array = &outputs[idx];
373            let field_name = format!("field_{}", u64::from(*fid));
374            let nullable = match policy {
375                GatherNullPolicy::IncludeNulls => true,
376                _ => array.null_count() > 0,
377            };
378            fields.push(Field::new(field_name, dtype.clone(), nullable));
379        }
380
381        let schema = Arc::new(Schema::new(fields));
382        RecordBatch::try_new(schema, outputs)
383            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
384    }
385
386    pub fn prepare_gather_context(
387        &self,
388        field_ids: &[LogicalFieldId],
389    ) -> Result<MultiGatherContext> {
390        let mut field_infos = Vec::with_capacity(field_ids.len());
391        for &fid in field_ids {
392            field_infos.push((fid, self.data_type(fid)?));
393        }
394
395        if field_infos.is_empty() {
396            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
397        }
398
399        let catalog = self.catalog.read().unwrap();
400        let mut key_pairs = Vec::with_capacity(field_infos.len());
401        for (fid, _) in &field_infos {
402            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
403            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
404            key_pairs.push((value_pk, row_pk));
405        }
406        drop(catalog);
407
408        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
409        for (value_pk, row_pk) in &key_pairs {
410            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
411            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
412        }
413        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
414        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
415        for result in descriptor_results {
416            if let GetResult::Raw { key, bytes } = result {
417                descriptor_map.insert(key, bytes);
418            }
419        }
420
421        let mut plans = Vec::with_capacity(field_infos.len());
422        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
423            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
424            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
425            let value_metas =
426                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
427
428            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
429            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
430            let row_metas =
431                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
432
433            if value_metas.len() != row_metas.len() {
434                return Err(Error::Internal(
435                    "gather_rows_multi: chunk count mismatch".into(),
436                ));
437            }
438
439            plans.push(FieldPlan {
440                dtype: dtype.clone(),
441                value_metas,
442                row_metas,
443                candidate_indices: Vec::new(),
444            });
445        }
446
447        Ok(MultiGatherContext::new(field_infos, plans))
448    }
449
450    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
451    ///
452    /// This path amortizes chunk fetch and decode costs across multiple calls by
453    /// retaining Arrow arrays and scratch state inside the provided context.
454    pub fn gather_rows_with_reusable_context(
455        &self,
456        ctx: &mut MultiGatherContext,
457        row_ids: &[u64],
458        policy: GatherNullPolicy,
459    ) -> Result<RecordBatch> {
460        if ctx.is_empty() {
461            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
462        }
463
464        if row_ids.is_empty() {
465            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
466            let mut fields = Vec::with_capacity(ctx.field_infos().len());
467            for (fid, dtype) in ctx.field_infos() {
468                arrays.push(new_empty_array(dtype));
469                let field_name = format!("field_{}", u64::from(*fid));
470                fields.push(Field::new(field_name, dtype.clone(), true));
471            }
472            let schema = Arc::new(Schema::new(fields));
473            return RecordBatch::try_new(schema, arrays)
474                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
475        }
476
477        let mut row_index = ctx.take_row_index();
478        let mut row_scratch = ctx.take_row_scratch();
479
480        let field_infos = ctx.field_infos().to_vec();
481        let mut chunk_keys = ctx.take_chunk_keys();
482
483        let result: Result<RecordBatch> = (|| {
484            let len = row_ids.len();
485            if row_scratch.len() < len {
486                row_scratch.resize(len, None);
487            }
488
489            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
490            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
491                Cow::Borrowed(row_ids)
492            } else {
493                let mut buf = row_ids.to_vec();
494                buf.sort_unstable();
495                Cow::Owned(buf)
496            };
497            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
498
499            let dense_base = if len == 0 {
500                None
501            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
502            {
503                Some(row_ids[0])
504            } else {
505                None
506            };
507
508            if dense_base.is_none() {
509                row_index.clear();
510                row_index.reserve(len);
511                for (idx, &row_id) in row_ids.iter().enumerate() {
512                    if row_index.insert(row_id, idx).is_some() {
513                        return Err(Error::Internal(
514                            "duplicate row_id in gather_rows_multi".into(),
515                        ));
516                    }
517                }
518            } else {
519                row_index.clear();
520            }
521
522            let row_locator = if let Some(base) = dense_base {
523                RowLocator::Dense { base }
524            } else {
525                RowLocator::Sparse { index: &row_index }
526            };
527
528            chunk_keys.clear();
529
530            {
531                let plans_mut = ctx.plans_mut();
532                for plan in plans_mut.iter_mut() {
533                    plan.candidate_indices.clear();
534                    for (idx, meta) in plan.row_metas.iter().enumerate() {
535                        if Self::chunk_intersects(sorted_row_ids, meta) {
536                            plan.candidate_indices.push(idx);
537                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
538                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
539                        }
540                    }
541                }
542            }
543
544            chunk_keys.sort_unstable();
545            chunk_keys.dedup();
546
547            {
548                let mut pending: Vec<BatchGet> = Vec::new();
549                {
550                    let cache = ctx.chunk_cache();
551                    for &key in &chunk_keys {
552                        if !cache.contains_key(&key) {
553                            pending.push(BatchGet::Raw { key });
554                        }
555                    }
556                }
557
558                if !pending.is_empty() {
559                    let chunk_results = self.pager.batch_get(&pending)?;
560                    let cache = ctx.chunk_cache_mut();
561                    for result in chunk_results {
562                        if let GetResult::Raw { key, bytes } = result {
563                            let array = deserialize_array(bytes)?;
564                            cache.insert(key, Arc::clone(&array));
565                        }
566                    }
567                }
568            }
569
570            let allow_missing = policy.allow_missing();
571
572            let mut outputs = Vec::with_capacity(ctx.plans().len());
573            for plan in ctx.plans() {
574                let array = match &plan.dtype {
575                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
576                        row_ids,
577                        row_locator,
578                        len,
579                        &plan.candidate_indices,
580                        plan,
581                        ctx.chunk_cache(),
582                        &mut row_scratch,
583                        allow_missing,
584                    ),
585                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
586                        row_ids,
587                        row_locator,
588                        len,
589                        &plan.candidate_indices,
590                        plan,
591                        ctx.chunk_cache(),
592                        &mut row_scratch,
593                        allow_missing,
594                    ),
595                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
596                        row_ids,
597                        row_locator,
598                        len,
599                        &plan.candidate_indices,
600                        plan,
601                        ctx.chunk_cache(),
602                        &mut row_scratch,
603                        allow_missing,
604                    ),
605                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
606                        row_ids,
607                        row_locator,
608                        len,
609                        &plan.candidate_indices,
610                        plan,
611                        ctx.chunk_cache(),
612                        &mut row_scratch,
613                        allow_missing,
614                    ),
615                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
616                        row_ids,
617                        row_locator,
618                        len,
619                        &plan.candidate_indices,
620                        plan,
621                        ctx.chunk_cache(),
622                        &mut row_scratch,
623                        allow_missing,
624                    ),
625                    DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
626                        row_locator,
627                        len,
628                        &plan.candidate_indices,
629                        plan,
630                        ctx.chunk_cache(),
631                        &mut row_scratch,
632                        allow_missing,
633                        &plan.dtype,
634                    ),
635                    other => with_integer_arrow_type!(
636                        other.clone(),
637                        |ArrowTy| {
638                            Self::gather_rows_from_chunks::<ArrowTy>(
639                                row_ids,
640                                row_locator,
641                                len,
642                                &plan.candidate_indices,
643                                plan,
644                                ctx.chunk_cache(),
645                                &mut row_scratch,
646                                allow_missing,
647                            )
648                        },
649                        Err(Error::Internal(format!(
650                            "gather_rows_multi: unsupported dtype {:?}",
651                            other
652                        ))),
653                    ),
654                }?;
655                outputs.push(array);
656            }
657
658            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
659                Self::filter_rows_with_non_null(outputs)?
660            } else {
661                outputs
662            };
663
664            let mut fields = Vec::with_capacity(field_infos.len());
665            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
666                let array = &outputs[idx];
667                let field_name = format!("field_{}", u64::from(*fid));
668                let nullable = match policy {
669                    GatherNullPolicy::IncludeNulls => true,
670                    _ => array.null_count() > 0,
671                };
672                fields.push(Field::new(field_name, dtype.clone(), nullable));
673            }
674
675            let schema = Arc::new(Schema::new(fields));
676            RecordBatch::try_new(schema, outputs)
677                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
678        })();
679
680        ctx.store_row_scratch(row_scratch);
681        ctx.store_row_index(row_index);
682        ctx.store_chunk_keys(chunk_keys);
683
684        result
685    }
686
687    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
688        let mut metas = Vec::new();
689        if head_page_pk == 0 {
690            return Ok(metas);
691        }
692        for meta in DescriptorIterator::new(pager, head_page_pk) {
693            let meta = meta?;
694            if meta.row_count > 0 {
695                metas.push(meta);
696            }
697        }
698        Ok(metas)
699    }
700
701    #[inline]
702    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
703        if sorted_row_ids.is_empty() || meta.row_count == 0 {
704            return false;
705        }
706        let min = meta.min_val_u64;
707        let max = meta.max_val_u64;
708        if min == 0 && max == 0 && meta.row_count > 0 {
709            return true;
710        }
711        if min > max {
712            return true;
713        }
714        let min_req = sorted_row_ids[0];
715        let max_req = *sorted_row_ids.last().unwrap();
716        if max < min_req || min > max_req {
717            return false;
718        }
719        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
720        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
721    }
722
723    fn gather_rows_single_shot_string<O>(
724        row_index: &FxHashMap<u64, usize>,
725        len: usize,
726        plan: &FieldPlan,
727        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
728        allow_missing: bool,
729    ) -> Result<ArrayRef>
730    where
731        O: OffsetSizeTrait,
732    {
733        shared_gather_rows_single_shot_string::<O>(
734            row_index,
735            len,
736            &plan.value_metas,
737            &plan.row_metas,
738            &plan.candidate_indices,
739            chunk_blobs,
740            allow_missing,
741        )
742    }
743
744    #[allow(clippy::too_many_arguments)]
745    fn gather_rows_single_shot_bool(
746        row_index: &FxHashMap<u64, usize>,
747        len: usize,
748        plan: &FieldPlan,
749        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
750        allow_missing: bool,
751    ) -> Result<ArrayRef> {
752        shared_gather_rows_single_shot_bool(
753            row_index,
754            len,
755            &plan.value_metas,
756            &plan.row_metas,
757            &plan.candidate_indices,
758            chunk_blobs,
759            allow_missing,
760        )
761    }
762
763    fn gather_rows_single_shot_struct(
764        row_index: &FxHashMap<u64, usize>,
765        len: usize,
766        plan: &FieldPlan,
767        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
768        allow_missing: bool,
769        dtype: &DataType,
770    ) -> Result<ArrayRef> {
771        shared_gather_rows_single_shot_struct(
772            row_index,
773            len,
774            &plan.value_metas,
775            &plan.row_metas,
776            &plan.candidate_indices,
777            chunk_blobs,
778            allow_missing,
779            dtype,
780        )
781    }
782
783    #[allow(clippy::too_many_arguments)]
784    fn gather_rows_single_shot<T>(
785        row_index: &FxHashMap<u64, usize>,
786        len: usize,
787        plan: &FieldPlan,
788        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
789        allow_missing: bool,
790    ) -> Result<ArrayRef>
791    where
792        T: ArrowPrimitiveType,
793    {
794        shared_gather_rows_single_shot::<T>(
795            row_index,
796            len,
797            &plan.value_metas,
798            &plan.row_metas,
799            &plan.candidate_indices,
800            chunk_blobs,
801            allow_missing,
802        )
803    }
804
805    #[allow(clippy::too_many_arguments)] // TODO: Refactor
806    fn gather_rows_from_chunks_string<O>(
807        row_ids: &[u64],
808        row_locator: RowLocator,
809        len: usize,
810        candidate_indices: &[usize],
811        plan: &FieldPlan,
812        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
813        row_scratch: &mut [Option<(usize, usize)>],
814        allow_missing: bool,
815    ) -> Result<ArrayRef>
816    where
817        O: OffsetSizeTrait,
818    {
819        shared_gather_rows_from_chunks_string::<O>(
820            row_ids,
821            row_locator,
822            len,
823            candidate_indices,
824            &plan.value_metas,
825            &plan.row_metas,
826            chunk_arrays,
827            row_scratch,
828            allow_missing,
829        )
830    }
831
832    fn gather_rows_single_shot_binary<O>(
833        row_index: &FxHashMap<u64, usize>,
834        len: usize,
835        plan: &FieldPlan,
836        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
837        allow_missing: bool,
838    ) -> Result<ArrayRef>
839    where
840        O: OffsetSizeTrait,
841    {
842        shared_gather_rows_single_shot_binary::<O>(
843            row_index,
844            len,
845            &plan.value_metas,
846            &plan.row_metas,
847            &plan.candidate_indices,
848            chunk_blobs,
849            allow_missing,
850        )
851    }
852
853    #[allow(clippy::too_many_arguments)] // TODO: Refactor
854    fn gather_rows_from_chunks_binary<O>(
855        row_ids: &[u64],
856        row_locator: RowLocator,
857        len: usize,
858        candidate_indices: &[usize],
859        plan: &FieldPlan,
860        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
861        row_scratch: &mut [Option<(usize, usize)>],
862        allow_missing: bool,
863    ) -> Result<ArrayRef>
864    where
865        O: OffsetSizeTrait,
866    {
867        shared_gather_rows_from_chunks_binary::<O>(
868            row_ids,
869            row_locator,
870            len,
871            candidate_indices,
872            &plan.value_metas,
873            &plan.row_metas,
874            chunk_arrays,
875            row_scratch,
876            allow_missing,
877        )
878    }
879
880    #[allow(clippy::too_many_arguments)] // TODO: Refactor
881    fn gather_rows_from_chunks_bool(
882        row_ids: &[u64],
883        row_locator: RowLocator,
884        len: usize,
885        candidate_indices: &[usize],
886        plan: &FieldPlan,
887        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
888        row_scratch: &mut [Option<(usize, usize)>],
889        allow_missing: bool,
890    ) -> Result<ArrayRef> {
891        shared_gather_rows_from_chunks_bool(
892            row_ids,
893            row_locator,
894            len,
895            candidate_indices,
896            &plan.value_metas,
897            &plan.row_metas,
898            chunk_arrays,
899            row_scratch,
900            allow_missing,
901        )
902    }
903
904    #[allow(clippy::too_many_arguments)] // TODO: Refactor
905    fn gather_rows_from_chunks_struct(
906        row_locator: RowLocator,
907        len: usize,
908        candidate_indices: &[usize],
909        plan: &FieldPlan,
910        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
911        row_scratch: &mut [Option<(usize, usize)>],
912        allow_missing: bool,
913        dtype: &DataType,
914    ) -> Result<ArrayRef> {
915        shared_gather_rows_from_chunks_struct(
916            row_locator,
917            len,
918            candidate_indices,
919            &plan.value_metas,
920            &plan.row_metas,
921            chunk_arrays,
922            row_scratch,
923            allow_missing,
924            dtype,
925        )
926    }
927
928    #[allow(clippy::too_many_arguments)] // TODO: Refactor
929    fn gather_rows_from_chunks<T>(
930        row_ids: &[u64],
931        row_locator: RowLocator,
932        len: usize,
933        candidate_indices: &[usize],
934        plan: &FieldPlan,
935        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
936        row_scratch: &mut [Option<(usize, usize)>],
937        allow_missing: bool,
938    ) -> Result<ArrayRef>
939    where
940        T: ArrowPrimitiveType,
941    {
942        shared_gather_rows_from_chunks::<T>(
943            row_ids,
944            row_locator,
945            len,
946            candidate_indices,
947            &plan.value_metas,
948            &plan.row_metas,
949            chunk_arrays,
950            row_scratch,
951            allow_missing,
952        )
953    }
954
955    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
956        shared_filter_rows_with_non_null(columns)
957    }
958}