llkv_column_map/store/scan/
mod.rs

1//! Primitive column scanners and visitor traits used by ColumnStore.
2//!
3//! Focus: integer primitives with fast unsorted and sorted scans.
4//! - Monomorphized per-type paths (no dynamic dispatch in hot loops)
5//! - Batches IO with pager, minimal object churn
6//! - Coalesces adjacent items into runs for sorted scans
7//! - Optional row-id variants
8//!
9//! This module is intentionally self-contained so higher-level methods in
10//! `store::mod` can delegate without code duplication.
11
12use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14use std::ops::{Bound, RangeBounds};
15
16use arrow::array::*;
17use arrow::compute;
18
19use super::ColumnStore;
20use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
21use crate::types::{LogicalFieldId, Namespace};
22use llkv_result::{Error, Result};
23use llkv_storage::{
24    pager::{BatchGet, GetResult, Pager},
25    serialization::deserialize_array,
26    types::PhysicalKey,
27};
28use simd_r_drive_entry_handle::EntryHandle;
29
30pub mod builder;
31pub use builder::*;
32
33pub mod kmerge;
34pub use kmerge::*;
35
36pub mod options;
37pub use options::*;
38
39pub mod unsorted;
40pub use unsorted::*;
41
42pub mod ranges;
43pub use ranges::*;
44
45pub mod sorted;
46pub use sorted::*;
47
48pub mod visitors;
49pub use visitors::*;
50
51pub mod filter;
52pub use filter::*;
53
54impl<P> ColumnStore<P>
55where
56    P: Pager<Blob = EntryHandle>,
57{
58    /// Unsorted scan with typed visitor callbacks per chunk.
59    fn scan_visit(
60        &self,
61        field_id: LogicalFieldId,
62        visitor: &mut dyn crate::store::scan::PrimitiveVisitor,
63    ) -> Result<()> {
64        let catalog = self.catalog.read().unwrap();
65        crate::store::scan::unsorted_visit(self.pager.as_ref(), &catalog.map, field_id, visitor)
66    }
67
68    /// Convenience: sorted scan with closures over coalesced runs.
69    /// Sorted scan with typed visitor callbacks over coalesced runs.
70    fn scan_sorted_visit(
71        &self,
72        field_id: LogicalFieldId,
73        visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
74    ) -> Result<()> {
75        let catalog = self.catalog.read().unwrap();
76        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
77        let desc_blob = self
78            .pager
79            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
80            .pop()
81            .and_then(|r| match r {
82                GetResult::Raw { bytes, .. } => Some(bytes),
83                _ => None,
84            })
85            .ok_or(Error::NotFound)?;
86        let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
87        drop(catalog);
88
89        let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
90        for m in crate::store::descriptor::DescriptorIterator::new(
91            self.pager.as_ref(),
92            desc.head_page_pk,
93        ) {
94            let meta = m?;
95            if meta.row_count == 0 {
96                continue;
97            }
98            if meta.value_order_perm_pk == 0 {
99                return Err(Error::NotFound);
100            }
101            metas.push(meta);
102        }
103        if metas.is_empty() {
104            return Ok(());
105        }
106        let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
107        let first_any =
108            llkv_storage::serialization::deserialize_array(buffers.value_handle(0).clone())?;
109        with_integer_arrow_type!(
110            first_any.data_type().clone(),
111            |ArrowTy| {
112                <ArrowTy as sorted::SortedDispatch>::visit(
113                    self.pager.as_ref(),
114                    &metas,
115                    &buffers,
116                    visitor,
117                )
118            },
119            Err(Error::Internal("unsupported sorted dtype".into())),
120        )
121    }
122
123    /// Sorted scan in reverse (descending) with typed visitor callbacks.
124    fn scan_sorted_visit_reverse(
125        &self,
126        field_id: LogicalFieldId,
127        visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
128    ) -> Result<()> {
129        let catalog = self.catalog.read().unwrap();
130        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
131        let desc_blob = self
132            .pager
133            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
134            .pop()
135            .and_then(|r| match r {
136                GetResult::Raw { bytes, .. } => Some(bytes),
137                _ => None,
138            })
139            .ok_or(Error::NotFound)?;
140        let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
141        drop(catalog);
142
143        let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
144        for m in crate::store::descriptor::DescriptorIterator::new(
145            self.pager.as_ref(),
146            desc.head_page_pk,
147        ) {
148            let meta = m?;
149            if meta.row_count == 0 {
150                continue;
151            }
152            if meta.value_order_perm_pk == 0 {
153                return Err(Error::NotFound);
154            }
155            metas.push(meta);
156        }
157        if metas.is_empty() {
158            return Ok(());
159        }
160        let mut gets: Vec<BatchGet> = Vec::with_capacity(metas.len() * 2);
161        for m in &metas {
162            gets.push(BatchGet::Raw { key: m.chunk_pk });
163            gets.push(BatchGet::Raw {
164                key: m.value_order_perm_pk,
165            });
166        }
167        let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
168        let first_any =
169            llkv_storage::serialization::deserialize_array(buffers.value_handle(0).clone())?;
170        with_integer_arrow_type!(
171            first_any.data_type().clone(),
172            |ArrowTy| {
173                <ArrowTy as sorted::SortedDispatch>::visit_rev(
174                    self.pager.as_ref(),
175                    &metas,
176                    &buffers,
177                    visitor,
178                )
179            },
180            Err(Error::Internal("unsupported sorted dtype".into())),
181        )
182    }
183
184    /// Unified scan entrypoint configured by ScanOptions.
185    /// Requires `V` to implement both unsorted and sorted visitor traits; methods are no-ops by default.
186    pub fn scan(
187        &self,
188        field_id: LogicalFieldId,
189        opts: ScanOptions,
190        visitor: &mut dyn crate::store::scan::PrimitiveFullVisitor,
191    ) -> Result<()> {
192        let paginate = opts.offset > 0 || opts.limit.is_some();
193        if !opts.sorted {
194            if opts.with_row_ids {
195                let row_fid = field_id.with_namespace(Namespace::RowIdShadow);
196                let catalog = self.catalog.read().unwrap();
197                if opts.include_nulls {
198                    let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
199                    if paginate {
200                        let mut pv = crate::store::scan::PaginateVisitor::new(
201                            visitor,
202                            opts.offset,
203                            opts.limit,
204                        );
205                        return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
206                            self.pager.as_ref(),
207                            &catalog.map,
208                            field_id,
209                            row_fid,
210                            anchor_fid,
211                            opts.nulls_first,
212                            &mut pv,
213                        );
214                    } else {
215                        return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
216                            self.pager.as_ref(),
217                            &catalog.map,
218                            field_id,
219                            row_fid,
220                            anchor_fid,
221                            opts.nulls_first,
222                            visitor,
223                        );
224                    }
225                }
226                if paginate {
227                    let mut pv =
228                        crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
229                    return crate::store::scan::unsorted_with_row_ids_visit(
230                        self.pager.as_ref(),
231                        &catalog.map,
232                        field_id,
233                        row_fid,
234                        &mut pv,
235                    );
236                } else {
237                    return crate::store::scan::unsorted_with_row_ids_visit(
238                        self.pager.as_ref(),
239                        &catalog.map,
240                        field_id,
241                        row_fid,
242                        visitor,
243                    );
244                }
245            }
246            if paginate {
247                let mut pv =
248                    crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
249                return self.scan_visit(field_id, &mut pv);
250            } else {
251                return self.scan_visit(field_id, visitor);
252            }
253        }
254
255        if opts.with_row_ids {
256            let row_fid = field_id.with_namespace(Namespace::RowIdShadow);
257            // Prepare value metas and blobs
258            let catalog = self.catalog.read().unwrap();
259            let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
260            let desc_blob = self
261                .pager
262                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
263                .pop()
264                .and_then(|r| match r {
265                    GetResult::Raw { bytes, .. } => Some(bytes),
266                    _ => None,
267                })
268                .ok_or(Error::NotFound)?;
269            let desc =
270                crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
271
272            let rid_descriptor_pk = *catalog.map.get(&row_fid).ok_or(Error::NotFound)?;
273            let rid_desc_blob = self
274                .pager
275                .batch_get(&[BatchGet::Raw {
276                    key: rid_descriptor_pk,
277                }])?
278                .pop()
279                .and_then(|r| match r {
280                    GetResult::Raw { bytes, .. } => Some(bytes),
281                    _ => None,
282                })
283                .ok_or(Error::NotFound)?;
284            let rid_desc =
285                crate::store::descriptor::ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
286            drop(catalog);
287
288            let mut metas_val: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
289            for m in crate::store::descriptor::DescriptorIterator::new(
290                self.pager.as_ref(),
291                desc.head_page_pk,
292            ) {
293                let meta = m?;
294                if meta.row_count == 0 {
295                    continue;
296                }
297                if meta.value_order_perm_pk == 0 {
298                    return Err(Error::NotFound);
299                }
300                metas_val.push(meta);
301            }
302            let mut metas_rid: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
303            for m in crate::store::descriptor::DescriptorIterator::new(
304                self.pager.as_ref(),
305                rid_desc.head_page_pk,
306            ) {
307                let meta = m?;
308                if meta.row_count == 0 {
309                    continue;
310                }
311                metas_rid.push(meta);
312            }
313            if metas_val.is_empty() {
314                return Ok(());
315            }
316            if metas_val.len() != metas_rid.len() {
317                return Err(Error::Internal(
318                    "sorted_with_row_ids: chunk count mismatch".into(),
319                ));
320            }
321
322            let buffers =
323                sorted::load_sorted_buffers_with_rids(self.pager.as_ref(), &metas_val, &metas_rid)?;
324            let first_any = llkv_storage::serialization::deserialize_array(
325                buffers.base().value_handle(0).clone(),
326            )?;
327            if opts.reverse {
328                if paginate {
329                    let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
330                        visitor,
331                        opts.offset,
332                        opts.limit,
333                        true,
334                    );
335                    let vals_res = with_integer_arrow_type!(
336                        first_any.data_type().clone(),
337                        |ArrowTy| {
338                            <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
339                                self.pager.as_ref(),
340                                &metas_val,
341                                &metas_rid,
342                                &buffers,
343                                &mut pv,
344                            )
345                        },
346                        Err(Error::Internal("unsupported sorted dtype".into())),
347                    );
348                    if opts.include_nulls {
349                        let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
350                        let catalog = self.catalog.read().unwrap();
351                        let anchor_desc_pk =
352                            *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
353                        let anchor_desc_blob = self
354                            .pager
355                            .batch_get(&[BatchGet::Raw {
356                                key: anchor_desc_pk,
357                            }])?
358                            .pop()
359                            .and_then(|r| match r {
360                                GetResult::Raw { bytes, .. } => Some(bytes),
361                                _ => None,
362                            })
363                            .ok_or(Error::NotFound)?;
364                        let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
365                            anchor_desc_blob.as_ref(),
366                        );
367                        drop(catalog);
368                        let mut anchor_rids: Vec<UInt64Array> = Vec::new();
369                        let mut get_keys: Vec<BatchGet> = Vec::new();
370                        let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> =
371                            Vec::new();
372                        for m in crate::store::descriptor::DescriptorIterator::new(
373                            self.pager.as_ref(),
374                            anchor_desc.head_page_pk,
375                        ) {
376                            let mm = m?;
377                            if mm.row_count == 0 {
378                                continue;
379                            }
380                            anchor_metas.push(mm);
381                            get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
382                        }
383                        if !anchor_metas.is_empty() {
384                            let res = self.pager.batch_get(&get_keys)?;
385                            for r in res {
386                                if let GetResult::Raw { bytes, .. } = r {
387                                    let any = deserialize_array(bytes)?;
388                                    let arr = any
389                                        .as_any()
390                                        .downcast_ref::<UInt64Array>()
391                                        .ok_or_else(|| {
392                                            Error::Internal("anchor rid not u64".into())
393                                        })?
394                                        .clone();
395                                    anchor_rids.push(arr);
396                                }
397                            }
398                        }
399                        let mut present_rids: Vec<UInt64Array> = Vec::new();
400                        for mr in &metas_rid {
401                            let any = deserialize_array(
402                                buffers
403                                    .rid_by_pk(mr.chunk_pk)
404                                    .ok_or(Error::NotFound)?
405                                    .clone(),
406                            )?;
407                            let arr = any
408                                .as_any()
409                                .downcast_ref::<UInt64Array>()
410                                .ok_or_else(|| Error::Internal("rid not u64".into()))?
411                                .clone();
412                            present_rids.push(arr);
413                        }
414                        let mut ai = anchor_rids.len();
415                        let mut aj: isize = -1; // reverse traversal
416                        let mut pi = present_rids.len();
417                        let mut pj: isize = -1;
418                        let mut buf: Vec<u64> = Vec::new();
419                        let mut flush = |v: &mut Vec<u64>| -> Result<()> {
420                            if v.is_empty() {
421                                return Ok(());
422                            }
423                            let arr = UInt64Array::from(std::mem::take(v));
424                            let len = arr.len();
425                            pv.null_run(&arr, 0, len);
426                            Ok(())
427                        };
428                        // Emit order: nulls_first => nulls then values; else values then nulls
429                        if opts.nulls_first {
430                            // Emit nulls first (descending rid order)
431                            if ai > 0 {
432                                ai -= 1;
433                                aj = anchor_rids[ai].len() as isize - 1;
434                            }
435                            if pi > 0 {
436                                pi -= 1;
437                                pj = present_rids[pi].len() as isize - 1;
438                            }
439                            while ai < anchor_rids.len() {
440                                // ai unsigned; exit when wrap logic ends
441                                let a = &anchor_rids[ai];
442                                while aj >= 0 {
443                                    let av = a.value(aj as usize);
444                                    // advance present down to <= av
445                                    while pi < present_rids.len() {
446                                        let p = &present_rids[pi];
447                                        if pj < 0 {
448                                            if pi == 0 {
449                                                break;
450                                            }
451                                            pi -= 1;
452                                            pj = present_rids[pi].len() as isize - 1;
453                                            continue;
454                                        }
455                                        let pv2 = p.value(pj as usize);
456                                        if pv2 > av {
457                                            pj -= 1;
458                                        } else {
459                                            break;
460                                        }
461                                    }
462                                    let present_eq = if pi < present_rids.len() {
463                                        let p = &present_rids[pi];
464                                        if pj >= 0 {
465                                            p.value(pj as usize) == av
466                                        } else {
467                                            false
468                                        }
469                                    } else {
470                                        false
471                                    };
472                                    if !present_eq {
473                                        buf.push(av);
474                                    }
475                                    if present_eq {
476                                        pj -= 1;
477                                    }
478                                    aj -= 1;
479                                    if buf.len() >= 4096 {
480                                        flush(&mut buf)?;
481                                    }
482                                }
483                                if ai == 0 {
484                                    break;
485                                }
486                                ai -= 1;
487                                aj = anchor_rids[ai].len() as isize - 1;
488                            }
489                            flush(&mut buf)?;
490                            vals_res
491                        } else {
492                            vals_res.and_then(|_| {
493                                if ai > 0 {
494                                    ai -= 1;
495                                    aj = anchor_rids[ai].len() as isize - 1;
496                                }
497                                if pi > 0 {
498                                    pi -= 1;
499                                    pj = present_rids[pi].len() as isize - 1;
500                                }
501                                while ai < anchor_rids.len() {
502                                    let a = &anchor_rids[ai];
503                                    while aj >= 0 {
504                                        let av = a.value(aj as usize);
505                                        while pi < present_rids.len() {
506                                            let p = &present_rids[pi];
507                                            if pj < 0 {
508                                                if pi == 0 {
509                                                    break;
510                                                }
511                                                pi -= 1;
512                                                pj = present_rids[pi].len() as isize - 1;
513                                                continue;
514                                            }
515                                            let pv2 = p.value(pj as usize);
516                                            if pv2 > av {
517                                                pj -= 1;
518                                            } else {
519                                                break;
520                                            }
521                                        }
522                                        let present_eq = if pi < present_rids.len() {
523                                            let p = &present_rids[pi];
524                                            if pj >= 0 {
525                                                p.value(pj as usize) == av
526                                            } else {
527                                                false
528                                            }
529                                        } else {
530                                            false
531                                        };
532                                        if !present_eq {
533                                            buf.push(av);
534                                        }
535                                        if present_eq {
536                                            pj -= 1;
537                                        }
538                                        aj -= 1;
539                                        if buf.len() >= 4096 {
540                                            flush(&mut buf)?;
541                                        }
542                                    }
543                                    if ai == 0 {
544                                        break;
545                                    }
546                                    ai -= 1;
547                                    aj = anchor_rids[ai].len() as isize - 1;
548                                }
549                                flush(&mut buf)?;
550                                Ok(())
551                            })
552                        }
553                    } else {
554                        vals_res
555                    }
556                } else {
557                    with_integer_arrow_type!(
558                        first_any.data_type().clone(),
559                        |ArrowTy| {
560                            <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
561                                self.pager.as_ref(),
562                                &metas_val,
563                                &metas_rid,
564                                &buffers,
565                                visitor,
566                            )
567                        },
568                        Err(Error::Internal("unsupported sorted dtype".into())),
569                    )
570                }
571            } else if paginate {
572                let mut pv =
573                    crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
574                let vals_res = with_integer_arrow_type!(
575                    first_any.data_type().clone(),
576                    |ArrowTy| {
577                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
578                            self.pager.as_ref(),
579                            &metas_val,
580                            &metas_rid,
581                            &buffers,
582                            &mut pv,
583                        )
584                    },
585                    Err(Error::Internal("unsupported sorted dtype".into())),
586                );
587                if opts.include_nulls {
588                    // nulls_first not meaningful without with_row_ids
589                    let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
590                    // Build anchor rid arrays (ascending)
591                    let catalog = self.catalog.read().unwrap();
592                    let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
593                    let anchor_desc_blob = self
594                        .pager
595                        .batch_get(&[BatchGet::Raw {
596                            key: anchor_desc_pk,
597                        }])?
598                        .pop()
599                        .and_then(|r| match r {
600                            GetResult::Raw { bytes, .. } => Some(bytes),
601                            _ => None,
602                        })
603                        .ok_or(Error::NotFound)?;
604                    let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
605                        anchor_desc_blob.as_ref(),
606                    );
607                    drop(catalog);
608                    let mut anchor_rids: Vec<UInt64Array> = Vec::new();
609                    let mut get_keys: Vec<BatchGet> = Vec::new();
610                    let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
611                    for m in crate::store::descriptor::DescriptorIterator::new(
612                        self.pager.as_ref(),
613                        anchor_desc.head_page_pk,
614                    ) {
615                        let mm = m?;
616                        if mm.row_count == 0 {
617                            continue;
618                        }
619                        anchor_metas.push(mm);
620                        get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
621                    }
622                    if !anchor_metas.is_empty() {
623                        let res = self.pager.batch_get(&get_keys)?;
624                        for r in res {
625                            if let GetResult::Raw { bytes, .. } = r {
626                                let any = deserialize_array(bytes)?;
627                                let arr = any
628                                    .as_any()
629                                    .downcast_ref::<UInt64Array>()
630                                    .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
631                                    .clone();
632                                anchor_rids.push(arr);
633                            }
634                        }
635                    }
636                    // Build present rids from metas_rid using buffered handles
637                    let mut present_rids: Vec<UInt64Array> = Vec::new();
638                    for mr in &metas_rid {
639                        let any = deserialize_array(
640                            buffers
641                                .rid_by_pk(mr.chunk_pk)
642                                .ok_or(Error::NotFound)?
643                                .clone(),
644                        )?;
645                        let arr = any
646                            .as_any()
647                            .downcast_ref::<UInt64Array>()
648                            .ok_or_else(|| Error::Internal("rid not u64".into()))?
649                            .clone();
650                        present_rids.push(arr);
651                    }
652                    // Two-pointer set-difference to get null rids (ascending)
653                    let mut ai = 0usize;
654                    let mut aj = 0usize; // anchor chunk/idx
655                    let mut pi = 0usize;
656                    let mut pj = 0usize; // present chunk/idx
657                    let mut buf: Vec<u64> = Vec::new();
658                    let mut flush = |v: &mut Vec<u64>| -> Result<()> {
659                        if v.is_empty() {
660                            return Ok(());
661                        }
662                        let arr = UInt64Array::from(std::mem::take(v));
663                        let len = arr.len();
664                        pv.null_run(&arr, 0, len);
665                        Ok(())
666                    };
667                    // Emit order: nulls_first => nulls then values; else values then nulls
668                    if opts.nulls_first {
669                        // Emit nulls first
670                        while ai < anchor_rids.len() {
671                            let a = &anchor_rids[ai];
672                            while aj < a.len() {
673                                let av = a.value(aj);
674                                // Advance present to >= av
675                                while pi < present_rids.len() {
676                                    let p = &present_rids[pi];
677                                    if pj >= p.len() {
678                                        pi += 1;
679                                        pj = 0;
680                                        continue;
681                                    }
682                                    let pv2 = p.value(pj);
683                                    if pv2 < av {
684                                        pj += 1;
685                                    } else {
686                                        break;
687                                    }
688                                }
689                                let present_eq = if pi < present_rids.len() {
690                                    let p = &present_rids[pi];
691                                    if pj < p.len() {
692                                        p.value(pj) == av
693                                    } else {
694                                        false
695                                    }
696                                } else {
697                                    false
698                                };
699                                if !present_eq {
700                                    buf.push(av);
701                                }
702                                if present_eq {
703                                    pj += 1;
704                                }
705                                aj += 1;
706                                if buf.len() >= 4096 {
707                                    flush(&mut buf)?;
708                                }
709                            }
710                            ai += 1;
711                            aj = 0;
712                        }
713                        flush(&mut buf)?;
714                        vals_res
715                    } else {
716                        vals_res.and_then(|_| {
717                            while ai < anchor_rids.len() {
718                                let a = &anchor_rids[ai];
719                                while aj < a.len() {
720                                    let av = a.value(aj);
721                                    while pi < present_rids.len() {
722                                        let p = &present_rids[pi];
723                                        if pj >= p.len() {
724                                            pi += 1;
725                                            pj = 0;
726                                            continue;
727                                        }
728                                        let pv2 = p.value(pj);
729                                        if pv2 < av {
730                                            pj += 1;
731                                        } else {
732                                            break;
733                                        }
734                                    }
735                                    let present_eq = if pi < present_rids.len() {
736                                        let p = &present_rids[pi];
737                                        if pj < p.len() {
738                                            p.value(pj) == av
739                                        } else {
740                                            false
741                                        }
742                                    } else {
743                                        false
744                                    };
745                                    if !present_eq {
746                                        buf.push(av);
747                                    }
748                                    if present_eq {
749                                        pj += 1;
750                                    }
751                                    aj += 1;
752                                    if buf.len() >= 4096 {
753                                        flush(&mut buf)?;
754                                    }
755                                }
756                                ai += 1;
757                                aj = 0;
758                            }
759                            flush(&mut buf)?;
760                            Ok(())
761                        })
762                    }
763                } else {
764                    vals_res
765                }
766            } else if opts.include_nulls && opts.nulls_first {
767                let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
768                    Error::Internal("anchor_row_id_field required when include_nulls=true".into())
769                })?;
770                let catalog = self.catalog.read().unwrap();
771                let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
772                let anchor_desc_blob = self
773                    .pager
774                    .batch_get(&[BatchGet::Raw {
775                        key: anchor_desc_pk,
776                    }])?
777                    .pop()
778                    .and_then(|r| match r {
779                        GetResult::Raw { bytes, .. } => Some(bytes),
780                        _ => None,
781                    })
782                    .ok_or(Error::NotFound)?;
783                let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
784                    anchor_desc_blob.as_ref(),
785                );
786                drop(catalog);
787                let mut anchor_rids: Vec<UInt64Array> = Vec::new();
788                let mut get_keys: Vec<BatchGet> = Vec::new();
789                let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
790                for m in crate::store::descriptor::DescriptorIterator::new(
791                    self.pager.as_ref(),
792                    anchor_desc.head_page_pk,
793                ) {
794                    let mm = m?;
795                    if mm.row_count == 0 {
796                        continue;
797                    }
798                    anchor_metas.push(mm);
799                    get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
800                }
801                if !anchor_metas.is_empty() {
802                    let res = self.pager.batch_get(&get_keys)?;
803                    for r in res {
804                        if let GetResult::Raw { bytes, .. } = r {
805                            let any = deserialize_array(bytes)?;
806                            let arr = any
807                                .as_any()
808                                .downcast_ref::<UInt64Array>()
809                                .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
810                                .clone();
811                            anchor_rids.push(arr);
812                        }
813                    }
814                }
815                let mut present_rids: Vec<UInt64Array> = Vec::new();
816                for mr in &metas_rid {
817                    let any = deserialize_array(
818                        buffers
819                            .rid_by_pk(mr.chunk_pk)
820                            .ok_or(Error::NotFound)?
821                            .clone(),
822                    )?;
823                    let arr = any
824                        .as_any()
825                        .downcast_ref::<UInt64Array>()
826                        .ok_or_else(|| Error::Internal("rid not u64".into()))?
827                        .clone();
828                    present_rids.push(arr);
829                }
830                // Emit nulls first
831                let mut ai = 0usize;
832                let mut aj = 0usize;
833                let mut pi = 0usize;
834                let mut pj = 0usize;
835                let mut buf: Vec<u64> = Vec::new();
836                let mut flush = |v: &mut Vec<u64>| -> Result<()> {
837                    if v.is_empty() {
838                        return Ok(());
839                    }
840                    let arr = UInt64Array::from(std::mem::take(v));
841                    let len = arr.len();
842                    visitor.null_run(&arr, 0, len);
843                    Ok(())
844                };
845                while ai < anchor_rids.len() {
846                    let a = &anchor_rids[ai];
847                    while aj < a.len() {
848                        let av = a.value(aj);
849                        while pi < present_rids.len() {
850                            let p = &present_rids[pi];
851                            if pj >= p.len() {
852                                pi += 1;
853                                pj = 0;
854                                continue;
855                            }
856                            let pv2 = p.value(pj);
857                            if pv2 < av {
858                                pj += 1;
859                            } else {
860                                break;
861                            }
862                        }
863                        let present_eq = if pi < present_rids.len() {
864                            let p = &present_rids[pi];
865                            if pj < p.len() {
866                                p.value(pj) == av
867                            } else {
868                                false
869                            }
870                        } else {
871                            false
872                        };
873                        if !present_eq {
874                            buf.push(av);
875                        }
876                        if present_eq {
877                            pj += 1;
878                        }
879                        aj += 1;
880                        if buf.len() >= 4096 {
881                            flush(&mut buf)?;
882                        }
883                    }
884                    ai += 1;
885                    aj = 0;
886                }
887                flush(&mut buf)?;
888                // Then emit values
889                with_integer_arrow_type!(
890                    first_any.data_type().clone(),
891                    |ArrowTy| {
892                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
893                            self.pager.as_ref(),
894                            &metas_val,
895                            &metas_rid,
896                            &buffers,
897                            visitor,
898                        )
899                    },
900                    Err(Error::Internal("unsupported sorted dtype".into())),
901                )
902            } else {
903                // Values first
904                let res = with_integer_arrow_type!(
905                    first_any.data_type().clone(),
906                    |ArrowTy| {
907                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
908                            self.pager.as_ref(),
909                            &metas_val,
910                            &metas_rid,
911                            &buffers,
912                            visitor,
913                        )
914                    },
915                    Err(Error::Internal("unsupported sorted dtype".into())),
916                );
917                if opts.include_nulls {
918                    res?;
919                    let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
920                        Error::Internal(
921                            "anchor_row_id_field required when include_nulls=true".into(),
922                        )
923                    })?;
924                    let catalog = self.catalog.read().unwrap();
925                    let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
926                    let anchor_desc_blob = self
927                        .pager
928                        .batch_get(&[BatchGet::Raw {
929                            key: anchor_desc_pk,
930                        }])?
931                        .pop()
932                        .and_then(|r| match r {
933                            GetResult::Raw { bytes, .. } => Some(bytes),
934                            _ => None,
935                        })
936                        .ok_or(Error::NotFound)?;
937                    let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
938                        anchor_desc_blob.as_ref(),
939                    );
940                    drop(catalog);
941                    let mut anchor_rids: Vec<UInt64Array> = Vec::new();
942                    let mut get_keys: Vec<BatchGet> = Vec::new();
943                    let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
944                    for m in crate::store::descriptor::DescriptorIterator::new(
945                        self.pager.as_ref(),
946                        anchor_desc.head_page_pk,
947                    ) {
948                        let mm = m?;
949                        if mm.row_count == 0 {
950                            continue;
951                        }
952                        anchor_metas.push(mm);
953                        get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
954                    }
955                    if !anchor_metas.is_empty() {
956                        let res_get = self.pager.batch_get(&get_keys)?;
957                        for r in res_get {
958                            if let GetResult::Raw { bytes, .. } = r {
959                                let any = deserialize_array(bytes)?;
960                                let arr = any
961                                    .as_any()
962                                    .downcast_ref::<UInt64Array>()
963                                    .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
964                                    .clone();
965                                anchor_rids.push(arr);
966                            }
967                        }
968                    }
969                    let mut present_rids: Vec<UInt64Array> = Vec::new();
970                    for mr in &metas_rid {
971                        let any = deserialize_array(
972                            buffers
973                                .rid_by_pk(mr.chunk_pk)
974                                .ok_or(Error::NotFound)?
975                                .clone(),
976                        )?;
977                        let arr = any
978                            .as_any()
979                            .downcast_ref::<UInt64Array>()
980                            .ok_or_else(|| Error::Internal("rid not u64".into()))?
981                            .clone();
982                        present_rids.push(arr);
983                    }
984                    let mut ai = 0usize;
985                    let mut aj = 0usize;
986                    let mut pi = 0usize;
987                    let mut pj = 0usize;
988                    let mut buf: Vec<u64> = Vec::new();
989                    let mut flush = |v: &mut Vec<u64>| -> Result<()> {
990                        if v.is_empty() {
991                            return Ok(());
992                        }
993                        let arr = UInt64Array::from(std::mem::take(v));
994                        let len = arr.len();
995                        visitor.null_run(&arr, 0, len);
996                        Ok(())
997                    };
998                    while ai < anchor_rids.len() {
999                        let a = &anchor_rids[ai];
1000                        while aj < a.len() {
1001                            let av = a.value(aj);
1002                            while pi < present_rids.len() {
1003                                let p = &present_rids[pi];
1004                                if pj >= p.len() {
1005                                    pi += 1;
1006                                    pj = 0;
1007                                    continue;
1008                                }
1009                                let pv2 = p.value(pj);
1010                                if pv2 < av {
1011                                    pj += 1;
1012                                } else {
1013                                    break;
1014                                }
1015                            }
1016                            let present_eq = if pi < present_rids.len() {
1017                                let p = &present_rids[pi];
1018                                if pj < p.len() {
1019                                    p.value(pj) == av
1020                                } else {
1021                                    false
1022                                }
1023                            } else {
1024                                false
1025                            };
1026                            if !present_eq {
1027                                buf.push(av);
1028                            }
1029                            if present_eq {
1030                                pj += 1;
1031                            }
1032                            aj += 1;
1033                            if buf.len() >= 4096 {
1034                                flush(&mut buf)?;
1035                            }
1036                        }
1037                        ai += 1;
1038                        aj = 0;
1039                    }
1040                    flush(&mut buf)?;
1041                    Ok(())
1042                } else {
1043                    res
1044                }
1045            }
1046        } else if opts.reverse {
1047            if paginate {
1048                let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1049                    visitor,
1050                    opts.offset,
1051                    opts.limit,
1052                    true,
1053                );
1054                self.scan_sorted_visit_reverse(field_id, &mut pv)
1055            } else {
1056                self.scan_sorted_visit_reverse(field_id, visitor)
1057            }
1058        } else if paginate {
1059            let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1060                visitor,
1061                opts.offset,
1062                opts.limit,
1063                false,
1064            );
1065            self.scan_sorted_visit(field_id, &mut pv)
1066        } else {
1067            self.scan_sorted_visit(field_id, visitor)
1068        }
1069    }
1070}