llkv_column_map/store/scan/
mod.rs

1//! Primitive column scanners and visitor traits used by ColumnStore.
2//!
3//! Focus: integer primitives with fast unsorted and sorted scans.
4//! - Monomorphized per-type paths (no dynamic dispatch in hot loops)
5//! - Batches IO with pager, minimal object churn
6//! - Coalesces adjacent items into runs for sorted scans
7//! - Optional row-id variants
8//!
9//! This module is intentionally self-contained so higher-level methods in
10//! `store::mod` can delegate without code duplication.
11
12use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14use std::ops::{Bound, RangeBounds};
15
16use arrow::array::*;
17use arrow::compute;
18
19use super::ColumnStore;
20use crate::serialization::deserialize_array;
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use llkv_result::{Error, Result};
23use llkv_storage::{
24    pager::{BatchGet, GetResult, Pager},
25    types::PhysicalKey,
26};
27use llkv_types::ids::{LogicalFieldId, LogicalStorageNamespace};
28use simd_r_drive_entry_handle::EntryHandle;
29
30pub mod builder;
31pub use builder::*;
32
33pub mod kmerge;
34pub use kmerge::*;
35
36pub mod options;
37pub use options::*;
38
39pub mod unsorted;
40pub use unsorted::*;
41
42pub mod ranges;
43pub use ranges::*;
44
45pub mod sorted;
46pub use sorted::*;
47
48pub mod visitors;
49pub use visitors::*;
50
51pub mod filter;
52pub use filter::*;
53
54impl<P> ColumnStore<P>
55where
56    P: Pager<Blob = EntryHandle>,
57{
58    /// Unsorted scan with typed visitor callbacks per chunk.
59    fn scan_visit(
60        &self,
61        field_id: LogicalFieldId,
62        visitor: &mut dyn crate::store::scan::PrimitiveVisitor,
63    ) -> Result<()> {
64        let catalog = self.catalog.read().unwrap();
65        crate::store::scan::unsorted_visit(self.pager.as_ref(), &catalog.map, field_id, visitor)
66    }
67
68    /// Convenience: sorted scan with closures over coalesced runs.
69    /// Sorted scan with typed visitor callbacks over coalesced runs.
70    fn scan_sorted_visit(
71        &self,
72        field_id: LogicalFieldId,
73        visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
74    ) -> Result<()> {
75        let catalog = self.catalog.read().unwrap();
76        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
77        let desc_blob = self
78            .pager
79            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
80            .pop()
81            .and_then(|r| match r {
82                GetResult::Raw { bytes, .. } => Some(bytes),
83                _ => None,
84            })
85            .ok_or(Error::NotFound)?;
86        let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
87        drop(catalog);
88
89        let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
90        for m in crate::store::descriptor::DescriptorIterator::new(
91            self.pager.as_ref(),
92            desc.head_page_pk,
93        ) {
94            let meta = m?;
95            if meta.row_count == 0 {
96                continue;
97            }
98            if meta.value_order_perm_pk == 0 {
99                return Err(Error::NotFound);
100            }
101            metas.push(meta);
102        }
103        if metas.is_empty() {
104            return Ok(());
105        }
106        let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
107        let first_any = deserialize_array(buffers.value_handle(0).clone())?;
108        with_integer_arrow_type!(
109            first_any.data_type().clone(),
110            |ArrowTy| {
111                <ArrowTy as sorted::SortedDispatch>::visit(
112                    self.pager.as_ref(),
113                    &metas,
114                    &buffers,
115                    visitor,
116                )
117            },
118            Err(Error::Internal("unsupported sorted dtype".into())),
119        )
120    }
121
122    /// Sorted scan in reverse (descending) with typed visitor callbacks.
123    fn scan_sorted_visit_reverse(
124        &self,
125        field_id: LogicalFieldId,
126        visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
127    ) -> Result<()> {
128        let catalog = self.catalog.read().unwrap();
129        let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
130        let desc_blob = self
131            .pager
132            .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
133            .pop()
134            .and_then(|r| match r {
135                GetResult::Raw { bytes, .. } => Some(bytes),
136                _ => None,
137            })
138            .ok_or(Error::NotFound)?;
139        let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
140        drop(catalog);
141
142        let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
143        for m in crate::store::descriptor::DescriptorIterator::new(
144            self.pager.as_ref(),
145            desc.head_page_pk,
146        ) {
147            let meta = m?;
148            if meta.row_count == 0 {
149                continue;
150            }
151            if meta.value_order_perm_pk == 0 {
152                return Err(Error::NotFound);
153            }
154            metas.push(meta);
155        }
156        if metas.is_empty() {
157            return Ok(());
158        }
159        let mut gets: Vec<BatchGet> = Vec::with_capacity(metas.len() * 2);
160        for m in &metas {
161            gets.push(BatchGet::Raw { key: m.chunk_pk });
162            gets.push(BatchGet::Raw {
163                key: m.value_order_perm_pk,
164            });
165        }
166        let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
167        let first_any = crate::serialization::deserialize_array(buffers.value_handle(0).clone())?;
168        with_integer_arrow_type!(
169            first_any.data_type().clone(),
170            |ArrowTy| {
171                <ArrowTy as sorted::SortedDispatch>::visit_rev(
172                    self.pager.as_ref(),
173                    &metas,
174                    &buffers,
175                    visitor,
176                )
177            },
178            Err(Error::Internal("unsupported sorted dtype".into())),
179        )
180    }
181
182    /// Unified scan entrypoint configured by ScanOptions.
183    /// Requires `V` to implement both unsorted and sorted visitor traits; methods are no-ops by default.
184    pub fn scan(
185        &self,
186        field_id: LogicalFieldId,
187        opts: ScanOptions,
188        visitor: &mut dyn crate::store::scan::PrimitiveFullVisitor,
189    ) -> Result<()> {
190        let paginate = opts.offset > 0 || opts.limit.is_some();
191        if !opts.sorted {
192            if opts.with_row_ids {
193                let row_fid = field_id.with_namespace(LogicalStorageNamespace::RowIdShadow);
194                let catalog = self.catalog.read().unwrap();
195                if opts.include_nulls {
196                    let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
197                    if paginate {
198                        let mut pv = crate::store::scan::PaginateVisitor::new(
199                            visitor,
200                            opts.offset,
201                            opts.limit,
202                        );
203                        return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
204                            self.pager.as_ref(),
205                            &catalog.map,
206                            field_id,
207                            row_fid,
208                            anchor_fid,
209                            opts.nulls_first,
210                            &mut pv,
211                        );
212                    } else {
213                        return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
214                            self.pager.as_ref(),
215                            &catalog.map,
216                            field_id,
217                            row_fid,
218                            anchor_fid,
219                            opts.nulls_first,
220                            visitor,
221                        );
222                    }
223                }
224                if paginate {
225                    let mut pv =
226                        crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
227                    return crate::store::scan::unsorted_with_row_ids_visit(
228                        self.pager.as_ref(),
229                        &catalog.map,
230                        field_id,
231                        row_fid,
232                        &mut pv,
233                    );
234                } else {
235                    return crate::store::scan::unsorted_with_row_ids_visit(
236                        self.pager.as_ref(),
237                        &catalog.map,
238                        field_id,
239                        row_fid,
240                        visitor,
241                    );
242                }
243            }
244            if paginate {
245                let mut pv =
246                    crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
247                return self.scan_visit(field_id, &mut pv);
248            } else {
249                return self.scan_visit(field_id, visitor);
250            }
251        }
252
253        if opts.with_row_ids {
254            let row_fid = field_id.with_namespace(LogicalStorageNamespace::RowIdShadow);
255            // Prepare value metas and blobs
256            let catalog = self.catalog.read().unwrap();
257            let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
258            let desc_blob = self
259                .pager
260                .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
261                .pop()
262                .and_then(|r| match r {
263                    GetResult::Raw { bytes, .. } => Some(bytes),
264                    _ => None,
265                })
266                .ok_or(Error::NotFound)?;
267            let desc =
268                crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
269
270            let rid_descriptor_pk = *catalog.map.get(&row_fid).ok_or(Error::NotFound)?;
271            let rid_desc_blob = self
272                .pager
273                .batch_get(&[BatchGet::Raw {
274                    key: rid_descriptor_pk,
275                }])?
276                .pop()
277                .and_then(|r| match r {
278                    GetResult::Raw { bytes, .. } => Some(bytes),
279                    _ => None,
280                })
281                .ok_or(Error::NotFound)?;
282            let rid_desc =
283                crate::store::descriptor::ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
284            drop(catalog);
285
286            let mut metas_val: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
287            for m in crate::store::descriptor::DescriptorIterator::new(
288                self.pager.as_ref(),
289                desc.head_page_pk,
290            ) {
291                let meta = m?;
292                if meta.row_count == 0 {
293                    continue;
294                }
295                if meta.value_order_perm_pk == 0 {
296                    return Err(Error::NotFound);
297                }
298                metas_val.push(meta);
299            }
300            let mut metas_rid: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
301            for m in crate::store::descriptor::DescriptorIterator::new(
302                self.pager.as_ref(),
303                rid_desc.head_page_pk,
304            ) {
305                let meta = m?;
306                if meta.row_count == 0 {
307                    continue;
308                }
309                metas_rid.push(meta);
310            }
311            if metas_val.is_empty() {
312                return Ok(());
313            }
314            if metas_val.len() != metas_rid.len() {
315                return Err(Error::Internal(
316                    "sorted_with_row_ids: chunk count mismatch".into(),
317                ));
318            }
319
320            let buffers =
321                sorted::load_sorted_buffers_with_rids(self.pager.as_ref(), &metas_val, &metas_rid)?;
322            let first_any =
323                crate::serialization::deserialize_array(buffers.base().value_handle(0).clone())?;
324            if opts.reverse {
325                if paginate {
326                    let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
327                        visitor,
328                        opts.offset,
329                        opts.limit,
330                        true,
331                    );
332                    let vals_res = with_integer_arrow_type!(
333                        first_any.data_type().clone(),
334                        |ArrowTy| {
335                            <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
336                                self.pager.as_ref(),
337                                &metas_val,
338                                &metas_rid,
339                                &buffers,
340                                &mut pv,
341                            )
342                        },
343                        Err(Error::Internal("unsupported sorted dtype".into())),
344                    );
345                    if opts.include_nulls {
346                        let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
347                        let catalog = self.catalog.read().unwrap();
348                        let anchor_desc_pk =
349                            *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
350                        let anchor_desc_blob = self
351                            .pager
352                            .batch_get(&[BatchGet::Raw {
353                                key: anchor_desc_pk,
354                            }])?
355                            .pop()
356                            .and_then(|r| match r {
357                                GetResult::Raw { bytes, .. } => Some(bytes),
358                                _ => None,
359                            })
360                            .ok_or(Error::NotFound)?;
361                        let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
362                            anchor_desc_blob.as_ref(),
363                        );
364                        drop(catalog);
365                        let mut anchor_rids: Vec<UInt64Array> = Vec::new();
366                        let mut get_keys: Vec<BatchGet> = Vec::new();
367                        let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> =
368                            Vec::new();
369                        for m in crate::store::descriptor::DescriptorIterator::new(
370                            self.pager.as_ref(),
371                            anchor_desc.head_page_pk,
372                        ) {
373                            let mm = m?;
374                            if mm.row_count == 0 {
375                                continue;
376                            }
377                            anchor_metas.push(mm);
378                            get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
379                        }
380                        if !anchor_metas.is_empty() {
381                            let res = self.pager.batch_get(&get_keys)?;
382                            for r in res {
383                                if let GetResult::Raw { bytes, .. } = r {
384                                    let any = deserialize_array(bytes)?;
385                                    let arr = any
386                                        .as_any()
387                                        .downcast_ref::<UInt64Array>()
388                                        .ok_or_else(|| {
389                                            Error::Internal("anchor rid not u64".into())
390                                        })?
391                                        .clone();
392                                    anchor_rids.push(arr);
393                                }
394                            }
395                        }
396                        let mut present_rids: Vec<UInt64Array> = Vec::new();
397                        for mr in &metas_rid {
398                            let any = deserialize_array(
399                                buffers
400                                    .rid_by_pk(mr.chunk_pk)
401                                    .ok_or(Error::NotFound)?
402                                    .clone(),
403                            )?;
404                            let arr = any
405                                .as_any()
406                                .downcast_ref::<UInt64Array>()
407                                .ok_or_else(|| Error::Internal("rid not u64".into()))?
408                                .clone();
409                            present_rids.push(arr);
410                        }
411                        let mut ai = anchor_rids.len();
412                        let mut aj: isize = -1; // reverse traversal
413                        let mut pi = present_rids.len();
414                        let mut pj: isize = -1;
415                        let mut buf: Vec<u64> = Vec::new();
416                        let mut flush = |v: &mut Vec<u64>| -> Result<()> {
417                            if v.is_empty() {
418                                return Ok(());
419                            }
420                            let arr = UInt64Array::from(std::mem::take(v));
421                            let len = arr.len();
422                            pv.null_run(&arr, 0, len);
423                            Ok(())
424                        };
425                        // Emit order: nulls_first => nulls then values; else values then nulls
426                        if opts.nulls_first {
427                            // Emit nulls first (descending rid order)
428                            if ai > 0 {
429                                ai -= 1;
430                                aj = anchor_rids[ai].len() as isize - 1;
431                            }
432                            if pi > 0 {
433                                pi -= 1;
434                                pj = present_rids[pi].len() as isize - 1;
435                            }
436                            while ai < anchor_rids.len() {
437                                // ai unsigned; exit when wrap logic ends
438                                let a = &anchor_rids[ai];
439                                while aj >= 0 {
440                                    let av = a.value(aj as usize);
441                                    // advance present down to <= av
442                                    while pi < present_rids.len() {
443                                        let p = &present_rids[pi];
444                                        if pj < 0 {
445                                            if pi == 0 {
446                                                break;
447                                            }
448                                            pi -= 1;
449                                            pj = present_rids[pi].len() as isize - 1;
450                                            continue;
451                                        }
452                                        let pv2 = p.value(pj as usize);
453                                        if pv2 > av {
454                                            pj -= 1;
455                                        } else {
456                                            break;
457                                        }
458                                    }
459                                    let present_eq = if pi < present_rids.len() {
460                                        let p = &present_rids[pi];
461                                        if pj >= 0 {
462                                            p.value(pj as usize) == av
463                                        } else {
464                                            false
465                                        }
466                                    } else {
467                                        false
468                                    };
469                                    if !present_eq {
470                                        buf.push(av);
471                                    }
472                                    if present_eq {
473                                        pj -= 1;
474                                    }
475                                    aj -= 1;
476                                    if buf.len() >= 4096 {
477                                        flush(&mut buf)?;
478                                    }
479                                }
480                                if ai == 0 {
481                                    break;
482                                }
483                                ai -= 1;
484                                aj = anchor_rids[ai].len() as isize - 1;
485                            }
486                            flush(&mut buf)?;
487                            vals_res
488                        } else {
489                            vals_res.and_then(|_| {
490                                if ai > 0 {
491                                    ai -= 1;
492                                    aj = anchor_rids[ai].len() as isize - 1;
493                                }
494                                if pi > 0 {
495                                    pi -= 1;
496                                    pj = present_rids[pi].len() as isize - 1;
497                                }
498                                while ai < anchor_rids.len() {
499                                    let a = &anchor_rids[ai];
500                                    while aj >= 0 {
501                                        let av = a.value(aj as usize);
502                                        while pi < present_rids.len() {
503                                            let p = &present_rids[pi];
504                                            if pj < 0 {
505                                                if pi == 0 {
506                                                    break;
507                                                }
508                                                pi -= 1;
509                                                pj = present_rids[pi].len() as isize - 1;
510                                                continue;
511                                            }
512                                            let pv2 = p.value(pj as usize);
513                                            if pv2 > av {
514                                                pj -= 1;
515                                            } else {
516                                                break;
517                                            }
518                                        }
519                                        let present_eq = if pi < present_rids.len() {
520                                            let p = &present_rids[pi];
521                                            if pj >= 0 {
522                                                p.value(pj as usize) == av
523                                            } else {
524                                                false
525                                            }
526                                        } else {
527                                            false
528                                        };
529                                        if !present_eq {
530                                            buf.push(av);
531                                        }
532                                        if present_eq {
533                                            pj -= 1;
534                                        }
535                                        aj -= 1;
536                                        if buf.len() >= 4096 {
537                                            flush(&mut buf)?;
538                                        }
539                                    }
540                                    if ai == 0 {
541                                        break;
542                                    }
543                                    ai -= 1;
544                                    aj = anchor_rids[ai].len() as isize - 1;
545                                }
546                                flush(&mut buf)?;
547                                Ok(())
548                            })
549                        }
550                    } else {
551                        vals_res
552                    }
553                } else {
554                    with_integer_arrow_type!(
555                        first_any.data_type().clone(),
556                        |ArrowTy| {
557                            <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
558                                self.pager.as_ref(),
559                                &metas_val,
560                                &metas_rid,
561                                &buffers,
562                                visitor,
563                            )
564                        },
565                        Err(Error::Internal("unsupported sorted dtype".into())),
566                    )
567                }
568            } else if paginate {
569                let mut pv =
570                    crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
571                let vals_res = with_integer_arrow_type!(
572                    first_any.data_type().clone(),
573                    |ArrowTy| {
574                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
575                            self.pager.as_ref(),
576                            &metas_val,
577                            &metas_rid,
578                            &buffers,
579                            &mut pv,
580                        )
581                    },
582                    Err(Error::Internal("unsupported sorted dtype".into())),
583                );
584                if opts.include_nulls {
585                    // nulls_first not meaningful without with_row_ids
586                    let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
587                    // Build anchor rid arrays (ascending)
588                    let catalog = self.catalog.read().unwrap();
589                    let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
590                    let anchor_desc_blob = self
591                        .pager
592                        .batch_get(&[BatchGet::Raw {
593                            key: anchor_desc_pk,
594                        }])?
595                        .pop()
596                        .and_then(|r| match r {
597                            GetResult::Raw { bytes, .. } => Some(bytes),
598                            _ => None,
599                        })
600                        .ok_or(Error::NotFound)?;
601                    let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
602                        anchor_desc_blob.as_ref(),
603                    );
604                    drop(catalog);
605                    let mut anchor_rids: Vec<UInt64Array> = Vec::new();
606                    let mut get_keys: Vec<BatchGet> = Vec::new();
607                    let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
608                    for m in crate::store::descriptor::DescriptorIterator::new(
609                        self.pager.as_ref(),
610                        anchor_desc.head_page_pk,
611                    ) {
612                        let mm = m?;
613                        if mm.row_count == 0 {
614                            continue;
615                        }
616                        anchor_metas.push(mm);
617                        get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
618                    }
619                    if !anchor_metas.is_empty() {
620                        let res = self.pager.batch_get(&get_keys)?;
621                        for r in res {
622                            if let GetResult::Raw { bytes, .. } = r {
623                                let any = deserialize_array(bytes)?;
624                                let arr = any
625                                    .as_any()
626                                    .downcast_ref::<UInt64Array>()
627                                    .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
628                                    .clone();
629                                anchor_rids.push(arr);
630                            }
631                        }
632                    }
633                    // Build present rids from metas_rid using buffered handles
634                    let mut present_rids: Vec<UInt64Array> = Vec::new();
635                    for mr in &metas_rid {
636                        let any = deserialize_array(
637                            buffers
638                                .rid_by_pk(mr.chunk_pk)
639                                .ok_or(Error::NotFound)?
640                                .clone(),
641                        )?;
642                        let arr = any
643                            .as_any()
644                            .downcast_ref::<UInt64Array>()
645                            .ok_or_else(|| Error::Internal("rid not u64".into()))?
646                            .clone();
647                        present_rids.push(arr);
648                    }
649                    // Two-pointer set-difference to get null rids (ascending)
650                    let mut ai = 0usize;
651                    let mut aj = 0usize; // anchor chunk/idx
652                    let mut pi = 0usize;
653                    let mut pj = 0usize; // present chunk/idx
654                    let mut buf: Vec<u64> = Vec::new();
655                    let mut flush = |v: &mut Vec<u64>| -> Result<()> {
656                        if v.is_empty() {
657                            return Ok(());
658                        }
659                        let arr = UInt64Array::from(std::mem::take(v));
660                        let len = arr.len();
661                        pv.null_run(&arr, 0, len);
662                        Ok(())
663                    };
664                    // Emit order: nulls_first => nulls then values; else values then nulls
665                    if opts.nulls_first {
666                        // Emit nulls first
667                        while ai < anchor_rids.len() {
668                            let a = &anchor_rids[ai];
669                            while aj < a.len() {
670                                let av = a.value(aj);
671                                // Advance present to >= av
672                                while pi < present_rids.len() {
673                                    let p = &present_rids[pi];
674                                    if pj >= p.len() {
675                                        pi += 1;
676                                        pj = 0;
677                                        continue;
678                                    }
679                                    let pv2 = p.value(pj);
680                                    if pv2 < av {
681                                        pj += 1;
682                                    } else {
683                                        break;
684                                    }
685                                }
686                                let present_eq = if pi < present_rids.len() {
687                                    let p = &present_rids[pi];
688                                    if pj < p.len() {
689                                        p.value(pj) == av
690                                    } else {
691                                        false
692                                    }
693                                } else {
694                                    false
695                                };
696                                if !present_eq {
697                                    buf.push(av);
698                                }
699                                if present_eq {
700                                    pj += 1;
701                                }
702                                aj += 1;
703                                if buf.len() >= 4096 {
704                                    flush(&mut buf)?;
705                                }
706                            }
707                            ai += 1;
708                            aj = 0;
709                        }
710                        flush(&mut buf)?;
711                        vals_res
712                    } else {
713                        vals_res.and_then(|_| {
714                            while ai < anchor_rids.len() {
715                                let a = &anchor_rids[ai];
716                                while aj < a.len() {
717                                    let av = a.value(aj);
718                                    while pi < present_rids.len() {
719                                        let p = &present_rids[pi];
720                                        if pj >= p.len() {
721                                            pi += 1;
722                                            pj = 0;
723                                            continue;
724                                        }
725                                        let pv2 = p.value(pj);
726                                        if pv2 < av {
727                                            pj += 1;
728                                        } else {
729                                            break;
730                                        }
731                                    }
732                                    let present_eq = if pi < present_rids.len() {
733                                        let p = &present_rids[pi];
734                                        if pj < p.len() {
735                                            p.value(pj) == av
736                                        } else {
737                                            false
738                                        }
739                                    } else {
740                                        false
741                                    };
742                                    if !present_eq {
743                                        buf.push(av);
744                                    }
745                                    if present_eq {
746                                        pj += 1;
747                                    }
748                                    aj += 1;
749                                    if buf.len() >= 4096 {
750                                        flush(&mut buf)?;
751                                    }
752                                }
753                                ai += 1;
754                                aj = 0;
755                            }
756                            flush(&mut buf)?;
757                            Ok(())
758                        })
759                    }
760                } else {
761                    vals_res
762                }
763            } else if opts.include_nulls && opts.nulls_first {
764                let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
765                    Error::Internal("anchor_row_id_field required when include_nulls=true".into())
766                })?;
767                let catalog = self.catalog.read().unwrap();
768                let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
769                let anchor_desc_blob = self
770                    .pager
771                    .batch_get(&[BatchGet::Raw {
772                        key: anchor_desc_pk,
773                    }])?
774                    .pop()
775                    .and_then(|r| match r {
776                        GetResult::Raw { bytes, .. } => Some(bytes),
777                        _ => None,
778                    })
779                    .ok_or(Error::NotFound)?;
780                let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
781                    anchor_desc_blob.as_ref(),
782                );
783                drop(catalog);
784                let mut anchor_rids: Vec<UInt64Array> = Vec::new();
785                let mut get_keys: Vec<BatchGet> = Vec::new();
786                let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
787                for m in crate::store::descriptor::DescriptorIterator::new(
788                    self.pager.as_ref(),
789                    anchor_desc.head_page_pk,
790                ) {
791                    let mm = m?;
792                    if mm.row_count == 0 {
793                        continue;
794                    }
795                    anchor_metas.push(mm);
796                    get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
797                }
798                if !anchor_metas.is_empty() {
799                    let res = self.pager.batch_get(&get_keys)?;
800                    for r in res {
801                        if let GetResult::Raw { bytes, .. } = r {
802                            let any = deserialize_array(bytes)?;
803                            let arr = any
804                                .as_any()
805                                .downcast_ref::<UInt64Array>()
806                                .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
807                                .clone();
808                            anchor_rids.push(arr);
809                        }
810                    }
811                }
812                let mut present_rids: Vec<UInt64Array> = Vec::new();
813                for mr in &metas_rid {
814                    let any = deserialize_array(
815                        buffers
816                            .rid_by_pk(mr.chunk_pk)
817                            .ok_or(Error::NotFound)?
818                            .clone(),
819                    )?;
820                    let arr = any
821                        .as_any()
822                        .downcast_ref::<UInt64Array>()
823                        .ok_or_else(|| Error::Internal("rid not u64".into()))?
824                        .clone();
825                    present_rids.push(arr);
826                }
827                // Emit nulls first
828                let mut ai = 0usize;
829                let mut aj = 0usize;
830                let mut pi = 0usize;
831                let mut pj = 0usize;
832                let mut buf: Vec<u64> = Vec::new();
833                let mut flush = |v: &mut Vec<u64>| -> Result<()> {
834                    if v.is_empty() {
835                        return Ok(());
836                    }
837                    let arr = UInt64Array::from(std::mem::take(v));
838                    let len = arr.len();
839                    visitor.null_run(&arr, 0, len);
840                    Ok(())
841                };
842                while ai < anchor_rids.len() {
843                    let a = &anchor_rids[ai];
844                    while aj < a.len() {
845                        let av = a.value(aj);
846                        while pi < present_rids.len() {
847                            let p = &present_rids[pi];
848                            if pj >= p.len() {
849                                pi += 1;
850                                pj = 0;
851                                continue;
852                            }
853                            let pv2 = p.value(pj);
854                            if pv2 < av {
855                                pj += 1;
856                            } else {
857                                break;
858                            }
859                        }
860                        let present_eq = if pi < present_rids.len() {
861                            let p = &present_rids[pi];
862                            if pj < p.len() {
863                                p.value(pj) == av
864                            } else {
865                                false
866                            }
867                        } else {
868                            false
869                        };
870                        if !present_eq {
871                            buf.push(av);
872                        }
873                        if present_eq {
874                            pj += 1;
875                        }
876                        aj += 1;
877                        if buf.len() >= 4096 {
878                            flush(&mut buf)?;
879                        }
880                    }
881                    ai += 1;
882                    aj = 0;
883                }
884                flush(&mut buf)?;
885                // Then emit values
886                with_integer_arrow_type!(
887                    first_any.data_type().clone(),
888                    |ArrowTy| {
889                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
890                            self.pager.as_ref(),
891                            &metas_val,
892                            &metas_rid,
893                            &buffers,
894                            visitor,
895                        )
896                    },
897                    Err(Error::Internal("unsupported sorted dtype".into())),
898                )
899            } else {
900                // Values first
901                let res = with_integer_arrow_type!(
902                    first_any.data_type().clone(),
903                    |ArrowTy| {
904                        <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
905                            self.pager.as_ref(),
906                            &metas_val,
907                            &metas_rid,
908                            &buffers,
909                            visitor,
910                        )
911                    },
912                    Err(Error::Internal("unsupported sorted dtype".into())),
913                );
914                if opts.include_nulls {
915                    res?;
916                    let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
917                        Error::Internal(
918                            "anchor_row_id_field required when include_nulls=true".into(),
919                        )
920                    })?;
921                    let catalog = self.catalog.read().unwrap();
922                    let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
923                    let anchor_desc_blob = self
924                        .pager
925                        .batch_get(&[BatchGet::Raw {
926                            key: anchor_desc_pk,
927                        }])?
928                        .pop()
929                        .and_then(|r| match r {
930                            GetResult::Raw { bytes, .. } => Some(bytes),
931                            _ => None,
932                        })
933                        .ok_or(Error::NotFound)?;
934                    let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
935                        anchor_desc_blob.as_ref(),
936                    );
937                    drop(catalog);
938                    let mut anchor_rids: Vec<UInt64Array> = Vec::new();
939                    let mut get_keys: Vec<BatchGet> = Vec::new();
940                    let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
941                    for m in crate::store::descriptor::DescriptorIterator::new(
942                        self.pager.as_ref(),
943                        anchor_desc.head_page_pk,
944                    ) {
945                        let mm = m?;
946                        if mm.row_count == 0 {
947                            continue;
948                        }
949                        anchor_metas.push(mm);
950                        get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
951                    }
952                    if !anchor_metas.is_empty() {
953                        let res_get = self.pager.batch_get(&get_keys)?;
954                        for r in res_get {
955                            if let GetResult::Raw { bytes, .. } = r {
956                                let any = deserialize_array(bytes)?;
957                                let arr = any
958                                    .as_any()
959                                    .downcast_ref::<UInt64Array>()
960                                    .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
961                                    .clone();
962                                anchor_rids.push(arr);
963                            }
964                        }
965                    }
966                    let mut present_rids: Vec<UInt64Array> = Vec::new();
967                    for mr in &metas_rid {
968                        let any = deserialize_array(
969                            buffers
970                                .rid_by_pk(mr.chunk_pk)
971                                .ok_or(Error::NotFound)?
972                                .clone(),
973                        )?;
974                        let arr = any
975                            .as_any()
976                            .downcast_ref::<UInt64Array>()
977                            .ok_or_else(|| Error::Internal("rid not u64".into()))?
978                            .clone();
979                        present_rids.push(arr);
980                    }
981                    let mut ai = 0usize;
982                    let mut aj = 0usize;
983                    let mut pi = 0usize;
984                    let mut pj = 0usize;
985                    let mut buf: Vec<u64> = Vec::new();
986                    let mut flush = |v: &mut Vec<u64>| -> Result<()> {
987                        if v.is_empty() {
988                            return Ok(());
989                        }
990                        let arr = UInt64Array::from(std::mem::take(v));
991                        let len = arr.len();
992                        visitor.null_run(&arr, 0, len);
993                        Ok(())
994                    };
995                    while ai < anchor_rids.len() {
996                        let a = &anchor_rids[ai];
997                        while aj < a.len() {
998                            let av = a.value(aj);
999                            while pi < present_rids.len() {
1000                                let p = &present_rids[pi];
1001                                if pj >= p.len() {
1002                                    pi += 1;
1003                                    pj = 0;
1004                                    continue;
1005                                }
1006                                let pv2 = p.value(pj);
1007                                if pv2 < av {
1008                                    pj += 1;
1009                                } else {
1010                                    break;
1011                                }
1012                            }
1013                            let present_eq = if pi < present_rids.len() {
1014                                let p = &present_rids[pi];
1015                                if pj < p.len() {
1016                                    p.value(pj) == av
1017                                } else {
1018                                    false
1019                                }
1020                            } else {
1021                                false
1022                            };
1023                            if !present_eq {
1024                                buf.push(av);
1025                            }
1026                            if present_eq {
1027                                pj += 1;
1028                            }
1029                            aj += 1;
1030                            if buf.len() >= 4096 {
1031                                flush(&mut buf)?;
1032                            }
1033                        }
1034                        ai += 1;
1035                        aj = 0;
1036                    }
1037                    flush(&mut buf)?;
1038                    Ok(())
1039                } else {
1040                    res
1041                }
1042            }
1043        } else if opts.reverse {
1044            if paginate {
1045                let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1046                    visitor,
1047                    opts.offset,
1048                    opts.limit,
1049                    true,
1050                );
1051                self.scan_sorted_visit_reverse(field_id, &mut pv)
1052            } else {
1053                self.scan_sorted_visit_reverse(field_id, visitor)
1054            }
1055        } else if paginate {
1056            let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1057                visitor,
1058                opts.offset,
1059                opts.limit,
1060                false,
1061            );
1062            self.scan_sorted_visit(field_id, &mut pv)
1063        } else {
1064            self.scan_sorted_visit(field_id, visitor)
1065        }
1066    }
1067}