1use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14use std::ops::{Bound, RangeBounds};
15
16use arrow::array::*;
17use arrow::compute;
18
19use super::ColumnStore;
20use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
21use crate::types::{LogicalFieldId, Namespace};
22use llkv_result::{Error, Result};
23use llkv_storage::{
24 pager::{BatchGet, GetResult, Pager},
25 serialization::deserialize_array,
26 types::PhysicalKey,
27};
28use simd_r_drive_entry_handle::EntryHandle;
29
30pub mod builder;
31pub use builder::*;
32
33pub mod kmerge;
34pub use kmerge::*;
35
36pub mod options;
37pub use options::*;
38
39pub mod unsorted;
40pub use unsorted::*;
41
42pub mod ranges;
43pub use ranges::*;
44
45pub mod sorted;
46pub use sorted::*;
47
48pub mod visitors;
49pub use visitors::*;
50
51pub mod filter;
52pub use filter::*;
53
54impl<P> ColumnStore<P>
55where
56 P: Pager<Blob = EntryHandle>,
57{
58 fn scan_visit(
60 &self,
61 field_id: LogicalFieldId,
62 visitor: &mut dyn crate::store::scan::PrimitiveVisitor,
63 ) -> Result<()> {
64 let catalog = self.catalog.read().unwrap();
65 crate::store::scan::unsorted_visit(self.pager.as_ref(), &catalog.map, field_id, visitor)
66 }
67
68 fn scan_sorted_visit(
71 &self,
72 field_id: LogicalFieldId,
73 visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
74 ) -> Result<()> {
75 let catalog = self.catalog.read().unwrap();
76 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
77 let desc_blob = self
78 .pager
79 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
80 .pop()
81 .and_then(|r| match r {
82 GetResult::Raw { bytes, .. } => Some(bytes),
83 _ => None,
84 })
85 .ok_or(Error::NotFound)?;
86 let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
87 drop(catalog);
88
89 let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
90 for m in crate::store::descriptor::DescriptorIterator::new(
91 self.pager.as_ref(),
92 desc.head_page_pk,
93 ) {
94 let meta = m?;
95 if meta.row_count == 0 {
96 continue;
97 }
98 if meta.value_order_perm_pk == 0 {
99 return Err(Error::NotFound);
100 }
101 metas.push(meta);
102 }
103 if metas.is_empty() {
104 return Ok(());
105 }
106 let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
107 let first_any =
108 llkv_storage::serialization::deserialize_array(buffers.value_handle(0).clone())?;
109 with_integer_arrow_type!(
110 first_any.data_type().clone(),
111 |ArrowTy| {
112 <ArrowTy as sorted::SortedDispatch>::visit(
113 self.pager.as_ref(),
114 &metas,
115 &buffers,
116 visitor,
117 )
118 },
119 Err(Error::Internal("unsupported sorted dtype".into())),
120 )
121 }
122
123 fn scan_sorted_visit_reverse(
125 &self,
126 field_id: LogicalFieldId,
127 visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
128 ) -> Result<()> {
129 let catalog = self.catalog.read().unwrap();
130 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
131 let desc_blob = self
132 .pager
133 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
134 .pop()
135 .and_then(|r| match r {
136 GetResult::Raw { bytes, .. } => Some(bytes),
137 _ => None,
138 })
139 .ok_or(Error::NotFound)?;
140 let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
141 drop(catalog);
142
143 let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
144 for m in crate::store::descriptor::DescriptorIterator::new(
145 self.pager.as_ref(),
146 desc.head_page_pk,
147 ) {
148 let meta = m?;
149 if meta.row_count == 0 {
150 continue;
151 }
152 if meta.value_order_perm_pk == 0 {
153 return Err(Error::NotFound);
154 }
155 metas.push(meta);
156 }
157 if metas.is_empty() {
158 return Ok(());
159 }
160 let mut gets: Vec<BatchGet> = Vec::with_capacity(metas.len() * 2);
161 for m in &metas {
162 gets.push(BatchGet::Raw { key: m.chunk_pk });
163 gets.push(BatchGet::Raw {
164 key: m.value_order_perm_pk,
165 });
166 }
167 let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
168 let first_any =
169 llkv_storage::serialization::deserialize_array(buffers.value_handle(0).clone())?;
170 with_integer_arrow_type!(
171 first_any.data_type().clone(),
172 |ArrowTy| {
173 <ArrowTy as sorted::SortedDispatch>::visit_rev(
174 self.pager.as_ref(),
175 &metas,
176 &buffers,
177 visitor,
178 )
179 },
180 Err(Error::Internal("unsupported sorted dtype".into())),
181 )
182 }
183
184 pub fn scan(
187 &self,
188 field_id: LogicalFieldId,
189 opts: ScanOptions,
190 visitor: &mut dyn crate::store::scan::PrimitiveFullVisitor,
191 ) -> Result<()> {
192 let paginate = opts.offset > 0 || opts.limit.is_some();
193 if !opts.sorted {
194 if opts.with_row_ids {
195 let row_fid = field_id.with_namespace(Namespace::RowIdShadow);
196 let catalog = self.catalog.read().unwrap();
197 if opts.include_nulls {
198 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
199 if paginate {
200 let mut pv = crate::store::scan::PaginateVisitor::new(
201 visitor,
202 opts.offset,
203 opts.limit,
204 );
205 return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
206 self.pager.as_ref(),
207 &catalog.map,
208 field_id,
209 row_fid,
210 anchor_fid,
211 opts.nulls_first,
212 &mut pv,
213 );
214 } else {
215 return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
216 self.pager.as_ref(),
217 &catalog.map,
218 field_id,
219 row_fid,
220 anchor_fid,
221 opts.nulls_first,
222 visitor,
223 );
224 }
225 }
226 if paginate {
227 let mut pv =
228 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
229 return crate::store::scan::unsorted_with_row_ids_visit(
230 self.pager.as_ref(),
231 &catalog.map,
232 field_id,
233 row_fid,
234 &mut pv,
235 );
236 } else {
237 return crate::store::scan::unsorted_with_row_ids_visit(
238 self.pager.as_ref(),
239 &catalog.map,
240 field_id,
241 row_fid,
242 visitor,
243 );
244 }
245 }
246 if paginate {
247 let mut pv =
248 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
249 return self.scan_visit(field_id, &mut pv);
250 } else {
251 return self.scan_visit(field_id, visitor);
252 }
253 }
254
255 if opts.with_row_ids {
256 let row_fid = field_id.with_namespace(Namespace::RowIdShadow);
257 let catalog = self.catalog.read().unwrap();
259 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
260 let desc_blob = self
261 .pager
262 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
263 .pop()
264 .and_then(|r| match r {
265 GetResult::Raw { bytes, .. } => Some(bytes),
266 _ => None,
267 })
268 .ok_or(Error::NotFound)?;
269 let desc =
270 crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
271
272 let rid_descriptor_pk = *catalog.map.get(&row_fid).ok_or(Error::NotFound)?;
273 let rid_desc_blob = self
274 .pager
275 .batch_get(&[BatchGet::Raw {
276 key: rid_descriptor_pk,
277 }])?
278 .pop()
279 .and_then(|r| match r {
280 GetResult::Raw { bytes, .. } => Some(bytes),
281 _ => None,
282 })
283 .ok_or(Error::NotFound)?;
284 let rid_desc =
285 crate::store::descriptor::ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
286 drop(catalog);
287
288 let mut metas_val: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
289 for m in crate::store::descriptor::DescriptorIterator::new(
290 self.pager.as_ref(),
291 desc.head_page_pk,
292 ) {
293 let meta = m?;
294 if meta.row_count == 0 {
295 continue;
296 }
297 if meta.value_order_perm_pk == 0 {
298 return Err(Error::NotFound);
299 }
300 metas_val.push(meta);
301 }
302 let mut metas_rid: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
303 for m in crate::store::descriptor::DescriptorIterator::new(
304 self.pager.as_ref(),
305 rid_desc.head_page_pk,
306 ) {
307 let meta = m?;
308 if meta.row_count == 0 {
309 continue;
310 }
311 metas_rid.push(meta);
312 }
313 if metas_val.is_empty() {
314 return Ok(());
315 }
316 if metas_val.len() != metas_rid.len() {
317 return Err(Error::Internal(
318 "sorted_with_row_ids: chunk count mismatch".into(),
319 ));
320 }
321
322 let buffers =
323 sorted::load_sorted_buffers_with_rids(self.pager.as_ref(), &metas_val, &metas_rid)?;
324 let first_any = llkv_storage::serialization::deserialize_array(
325 buffers.base().value_handle(0).clone(),
326 )?;
327 if opts.reverse {
328 if paginate {
329 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
330 visitor,
331 opts.offset,
332 opts.limit,
333 true,
334 );
335 let vals_res = with_integer_arrow_type!(
336 first_any.data_type().clone(),
337 |ArrowTy| {
338 <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
339 self.pager.as_ref(),
340 &metas_val,
341 &metas_rid,
342 &buffers,
343 &mut pv,
344 )
345 },
346 Err(Error::Internal("unsupported sorted dtype".into())),
347 );
348 if opts.include_nulls {
349 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
350 let catalog = self.catalog.read().unwrap();
351 let anchor_desc_pk =
352 *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
353 let anchor_desc_blob = self
354 .pager
355 .batch_get(&[BatchGet::Raw {
356 key: anchor_desc_pk,
357 }])?
358 .pop()
359 .and_then(|r| match r {
360 GetResult::Raw { bytes, .. } => Some(bytes),
361 _ => None,
362 })
363 .ok_or(Error::NotFound)?;
364 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
365 anchor_desc_blob.as_ref(),
366 );
367 drop(catalog);
368 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
369 let mut get_keys: Vec<BatchGet> = Vec::new();
370 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> =
371 Vec::new();
372 for m in crate::store::descriptor::DescriptorIterator::new(
373 self.pager.as_ref(),
374 anchor_desc.head_page_pk,
375 ) {
376 let mm = m?;
377 if mm.row_count == 0 {
378 continue;
379 }
380 anchor_metas.push(mm);
381 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
382 }
383 if !anchor_metas.is_empty() {
384 let res = self.pager.batch_get(&get_keys)?;
385 for r in res {
386 if let GetResult::Raw { bytes, .. } = r {
387 let any = deserialize_array(bytes)?;
388 let arr = any
389 .as_any()
390 .downcast_ref::<UInt64Array>()
391 .ok_or_else(|| {
392 Error::Internal("anchor rid not u64".into())
393 })?
394 .clone();
395 anchor_rids.push(arr);
396 }
397 }
398 }
399 let mut present_rids: Vec<UInt64Array> = Vec::new();
400 for mr in &metas_rid {
401 let any = deserialize_array(
402 buffers
403 .rid_by_pk(mr.chunk_pk)
404 .ok_or(Error::NotFound)?
405 .clone(),
406 )?;
407 let arr = any
408 .as_any()
409 .downcast_ref::<UInt64Array>()
410 .ok_or_else(|| Error::Internal("rid not u64".into()))?
411 .clone();
412 present_rids.push(arr);
413 }
414 let mut ai = anchor_rids.len();
415 let mut aj: isize = -1; let mut pi = present_rids.len();
417 let mut pj: isize = -1;
418 let mut buf: Vec<u64> = Vec::new();
419 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
420 if v.is_empty() {
421 return Ok(());
422 }
423 let arr = UInt64Array::from(std::mem::take(v));
424 let len = arr.len();
425 pv.null_run(&arr, 0, len);
426 Ok(())
427 };
428 if opts.nulls_first {
430 if ai > 0 {
432 ai -= 1;
433 aj = anchor_rids[ai].len() as isize - 1;
434 }
435 if pi > 0 {
436 pi -= 1;
437 pj = present_rids[pi].len() as isize - 1;
438 }
439 while ai < anchor_rids.len() {
440 let a = &anchor_rids[ai];
442 while aj >= 0 {
443 let av = a.value(aj as usize);
444 while pi < present_rids.len() {
446 let p = &present_rids[pi];
447 if pj < 0 {
448 if pi == 0 {
449 break;
450 }
451 pi -= 1;
452 pj = present_rids[pi].len() as isize - 1;
453 continue;
454 }
455 let pv2 = p.value(pj as usize);
456 if pv2 > av {
457 pj -= 1;
458 } else {
459 break;
460 }
461 }
462 let present_eq = if pi < present_rids.len() {
463 let p = &present_rids[pi];
464 if pj >= 0 {
465 p.value(pj as usize) == av
466 } else {
467 false
468 }
469 } else {
470 false
471 };
472 if !present_eq {
473 buf.push(av);
474 }
475 if present_eq {
476 pj -= 1;
477 }
478 aj -= 1;
479 if buf.len() >= 4096 {
480 flush(&mut buf)?;
481 }
482 }
483 if ai == 0 {
484 break;
485 }
486 ai -= 1;
487 aj = anchor_rids[ai].len() as isize - 1;
488 }
489 flush(&mut buf)?;
490 vals_res
491 } else {
492 vals_res.and_then(|_| {
493 if ai > 0 {
494 ai -= 1;
495 aj = anchor_rids[ai].len() as isize - 1;
496 }
497 if pi > 0 {
498 pi -= 1;
499 pj = present_rids[pi].len() as isize - 1;
500 }
501 while ai < anchor_rids.len() {
502 let a = &anchor_rids[ai];
503 while aj >= 0 {
504 let av = a.value(aj as usize);
505 while pi < present_rids.len() {
506 let p = &present_rids[pi];
507 if pj < 0 {
508 if pi == 0 {
509 break;
510 }
511 pi -= 1;
512 pj = present_rids[pi].len() as isize - 1;
513 continue;
514 }
515 let pv2 = p.value(pj as usize);
516 if pv2 > av {
517 pj -= 1;
518 } else {
519 break;
520 }
521 }
522 let present_eq = if pi < present_rids.len() {
523 let p = &present_rids[pi];
524 if pj >= 0 {
525 p.value(pj as usize) == av
526 } else {
527 false
528 }
529 } else {
530 false
531 };
532 if !present_eq {
533 buf.push(av);
534 }
535 if present_eq {
536 pj -= 1;
537 }
538 aj -= 1;
539 if buf.len() >= 4096 {
540 flush(&mut buf)?;
541 }
542 }
543 if ai == 0 {
544 break;
545 }
546 ai -= 1;
547 aj = anchor_rids[ai].len() as isize - 1;
548 }
549 flush(&mut buf)?;
550 Ok(())
551 })
552 }
553 } else {
554 vals_res
555 }
556 } else {
557 with_integer_arrow_type!(
558 first_any.data_type().clone(),
559 |ArrowTy| {
560 <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
561 self.pager.as_ref(),
562 &metas_val,
563 &metas_rid,
564 &buffers,
565 visitor,
566 )
567 },
568 Err(Error::Internal("unsupported sorted dtype".into())),
569 )
570 }
571 } else if paginate {
572 let mut pv =
573 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
574 let vals_res = with_integer_arrow_type!(
575 first_any.data_type().clone(),
576 |ArrowTy| {
577 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
578 self.pager.as_ref(),
579 &metas_val,
580 &metas_rid,
581 &buffers,
582 &mut pv,
583 )
584 },
585 Err(Error::Internal("unsupported sorted dtype".into())),
586 );
587 if opts.include_nulls {
588 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
590 let catalog = self.catalog.read().unwrap();
592 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
593 let anchor_desc_blob = self
594 .pager
595 .batch_get(&[BatchGet::Raw {
596 key: anchor_desc_pk,
597 }])?
598 .pop()
599 .and_then(|r| match r {
600 GetResult::Raw { bytes, .. } => Some(bytes),
601 _ => None,
602 })
603 .ok_or(Error::NotFound)?;
604 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
605 anchor_desc_blob.as_ref(),
606 );
607 drop(catalog);
608 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
609 let mut get_keys: Vec<BatchGet> = Vec::new();
610 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
611 for m in crate::store::descriptor::DescriptorIterator::new(
612 self.pager.as_ref(),
613 anchor_desc.head_page_pk,
614 ) {
615 let mm = m?;
616 if mm.row_count == 0 {
617 continue;
618 }
619 anchor_metas.push(mm);
620 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
621 }
622 if !anchor_metas.is_empty() {
623 let res = self.pager.batch_get(&get_keys)?;
624 for r in res {
625 if let GetResult::Raw { bytes, .. } = r {
626 let any = deserialize_array(bytes)?;
627 let arr = any
628 .as_any()
629 .downcast_ref::<UInt64Array>()
630 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
631 .clone();
632 anchor_rids.push(arr);
633 }
634 }
635 }
636 let mut present_rids: Vec<UInt64Array> = Vec::new();
638 for mr in &metas_rid {
639 let any = deserialize_array(
640 buffers
641 .rid_by_pk(mr.chunk_pk)
642 .ok_or(Error::NotFound)?
643 .clone(),
644 )?;
645 let arr = any
646 .as_any()
647 .downcast_ref::<UInt64Array>()
648 .ok_or_else(|| Error::Internal("rid not u64".into()))?
649 .clone();
650 present_rids.push(arr);
651 }
652 let mut ai = 0usize;
654 let mut aj = 0usize; let mut pi = 0usize;
656 let mut pj = 0usize; let mut buf: Vec<u64> = Vec::new();
658 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
659 if v.is_empty() {
660 return Ok(());
661 }
662 let arr = UInt64Array::from(std::mem::take(v));
663 let len = arr.len();
664 pv.null_run(&arr, 0, len);
665 Ok(())
666 };
667 if opts.nulls_first {
669 while ai < anchor_rids.len() {
671 let a = &anchor_rids[ai];
672 while aj < a.len() {
673 let av = a.value(aj);
674 while pi < present_rids.len() {
676 let p = &present_rids[pi];
677 if pj >= p.len() {
678 pi += 1;
679 pj = 0;
680 continue;
681 }
682 let pv2 = p.value(pj);
683 if pv2 < av {
684 pj += 1;
685 } else {
686 break;
687 }
688 }
689 let present_eq = if pi < present_rids.len() {
690 let p = &present_rids[pi];
691 if pj < p.len() {
692 p.value(pj) == av
693 } else {
694 false
695 }
696 } else {
697 false
698 };
699 if !present_eq {
700 buf.push(av);
701 }
702 if present_eq {
703 pj += 1;
704 }
705 aj += 1;
706 if buf.len() >= 4096 {
707 flush(&mut buf)?;
708 }
709 }
710 ai += 1;
711 aj = 0;
712 }
713 flush(&mut buf)?;
714 vals_res
715 } else {
716 vals_res.and_then(|_| {
717 while ai < anchor_rids.len() {
718 let a = &anchor_rids[ai];
719 while aj < a.len() {
720 let av = a.value(aj);
721 while pi < present_rids.len() {
722 let p = &present_rids[pi];
723 if pj >= p.len() {
724 pi += 1;
725 pj = 0;
726 continue;
727 }
728 let pv2 = p.value(pj);
729 if pv2 < av {
730 pj += 1;
731 } else {
732 break;
733 }
734 }
735 let present_eq = if pi < present_rids.len() {
736 let p = &present_rids[pi];
737 if pj < p.len() {
738 p.value(pj) == av
739 } else {
740 false
741 }
742 } else {
743 false
744 };
745 if !present_eq {
746 buf.push(av);
747 }
748 if present_eq {
749 pj += 1;
750 }
751 aj += 1;
752 if buf.len() >= 4096 {
753 flush(&mut buf)?;
754 }
755 }
756 ai += 1;
757 aj = 0;
758 }
759 flush(&mut buf)?;
760 Ok(())
761 })
762 }
763 } else {
764 vals_res
765 }
766 } else if opts.include_nulls && opts.nulls_first {
767 let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
768 Error::Internal("anchor_row_id_field required when include_nulls=true".into())
769 })?;
770 let catalog = self.catalog.read().unwrap();
771 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
772 let anchor_desc_blob = self
773 .pager
774 .batch_get(&[BatchGet::Raw {
775 key: anchor_desc_pk,
776 }])?
777 .pop()
778 .and_then(|r| match r {
779 GetResult::Raw { bytes, .. } => Some(bytes),
780 _ => None,
781 })
782 .ok_or(Error::NotFound)?;
783 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
784 anchor_desc_blob.as_ref(),
785 );
786 drop(catalog);
787 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
788 let mut get_keys: Vec<BatchGet> = Vec::new();
789 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
790 for m in crate::store::descriptor::DescriptorIterator::new(
791 self.pager.as_ref(),
792 anchor_desc.head_page_pk,
793 ) {
794 let mm = m?;
795 if mm.row_count == 0 {
796 continue;
797 }
798 anchor_metas.push(mm);
799 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
800 }
801 if !anchor_metas.is_empty() {
802 let res = self.pager.batch_get(&get_keys)?;
803 for r in res {
804 if let GetResult::Raw { bytes, .. } = r {
805 let any = deserialize_array(bytes)?;
806 let arr = any
807 .as_any()
808 .downcast_ref::<UInt64Array>()
809 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
810 .clone();
811 anchor_rids.push(arr);
812 }
813 }
814 }
815 let mut present_rids: Vec<UInt64Array> = Vec::new();
816 for mr in &metas_rid {
817 let any = deserialize_array(
818 buffers
819 .rid_by_pk(mr.chunk_pk)
820 .ok_or(Error::NotFound)?
821 .clone(),
822 )?;
823 let arr = any
824 .as_any()
825 .downcast_ref::<UInt64Array>()
826 .ok_or_else(|| Error::Internal("rid not u64".into()))?
827 .clone();
828 present_rids.push(arr);
829 }
830 let mut ai = 0usize;
832 let mut aj = 0usize;
833 let mut pi = 0usize;
834 let mut pj = 0usize;
835 let mut buf: Vec<u64> = Vec::new();
836 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
837 if v.is_empty() {
838 return Ok(());
839 }
840 let arr = UInt64Array::from(std::mem::take(v));
841 let len = arr.len();
842 visitor.null_run(&arr, 0, len);
843 Ok(())
844 };
845 while ai < anchor_rids.len() {
846 let a = &anchor_rids[ai];
847 while aj < a.len() {
848 let av = a.value(aj);
849 while pi < present_rids.len() {
850 let p = &present_rids[pi];
851 if pj >= p.len() {
852 pi += 1;
853 pj = 0;
854 continue;
855 }
856 let pv2 = p.value(pj);
857 if pv2 < av {
858 pj += 1;
859 } else {
860 break;
861 }
862 }
863 let present_eq = if pi < present_rids.len() {
864 let p = &present_rids[pi];
865 if pj < p.len() {
866 p.value(pj) == av
867 } else {
868 false
869 }
870 } else {
871 false
872 };
873 if !present_eq {
874 buf.push(av);
875 }
876 if present_eq {
877 pj += 1;
878 }
879 aj += 1;
880 if buf.len() >= 4096 {
881 flush(&mut buf)?;
882 }
883 }
884 ai += 1;
885 aj = 0;
886 }
887 flush(&mut buf)?;
888 with_integer_arrow_type!(
890 first_any.data_type().clone(),
891 |ArrowTy| {
892 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
893 self.pager.as_ref(),
894 &metas_val,
895 &metas_rid,
896 &buffers,
897 visitor,
898 )
899 },
900 Err(Error::Internal("unsupported sorted dtype".into())),
901 )
902 } else {
903 let res = with_integer_arrow_type!(
905 first_any.data_type().clone(),
906 |ArrowTy| {
907 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
908 self.pager.as_ref(),
909 &metas_val,
910 &metas_rid,
911 &buffers,
912 visitor,
913 )
914 },
915 Err(Error::Internal("unsupported sorted dtype".into())),
916 );
917 if opts.include_nulls {
918 res?;
919 let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
920 Error::Internal(
921 "anchor_row_id_field required when include_nulls=true".into(),
922 )
923 })?;
924 let catalog = self.catalog.read().unwrap();
925 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
926 let anchor_desc_blob = self
927 .pager
928 .batch_get(&[BatchGet::Raw {
929 key: anchor_desc_pk,
930 }])?
931 .pop()
932 .and_then(|r| match r {
933 GetResult::Raw { bytes, .. } => Some(bytes),
934 _ => None,
935 })
936 .ok_or(Error::NotFound)?;
937 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
938 anchor_desc_blob.as_ref(),
939 );
940 drop(catalog);
941 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
942 let mut get_keys: Vec<BatchGet> = Vec::new();
943 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
944 for m in crate::store::descriptor::DescriptorIterator::new(
945 self.pager.as_ref(),
946 anchor_desc.head_page_pk,
947 ) {
948 let mm = m?;
949 if mm.row_count == 0 {
950 continue;
951 }
952 anchor_metas.push(mm);
953 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
954 }
955 if !anchor_metas.is_empty() {
956 let res_get = self.pager.batch_get(&get_keys)?;
957 for r in res_get {
958 if let GetResult::Raw { bytes, .. } = r {
959 let any = deserialize_array(bytes)?;
960 let arr = any
961 .as_any()
962 .downcast_ref::<UInt64Array>()
963 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
964 .clone();
965 anchor_rids.push(arr);
966 }
967 }
968 }
969 let mut present_rids: Vec<UInt64Array> = Vec::new();
970 for mr in &metas_rid {
971 let any = deserialize_array(
972 buffers
973 .rid_by_pk(mr.chunk_pk)
974 .ok_or(Error::NotFound)?
975 .clone(),
976 )?;
977 let arr = any
978 .as_any()
979 .downcast_ref::<UInt64Array>()
980 .ok_or_else(|| Error::Internal("rid not u64".into()))?
981 .clone();
982 present_rids.push(arr);
983 }
984 let mut ai = 0usize;
985 let mut aj = 0usize;
986 let mut pi = 0usize;
987 let mut pj = 0usize;
988 let mut buf: Vec<u64> = Vec::new();
989 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
990 if v.is_empty() {
991 return Ok(());
992 }
993 let arr = UInt64Array::from(std::mem::take(v));
994 let len = arr.len();
995 visitor.null_run(&arr, 0, len);
996 Ok(())
997 };
998 while ai < anchor_rids.len() {
999 let a = &anchor_rids[ai];
1000 while aj < a.len() {
1001 let av = a.value(aj);
1002 while pi < present_rids.len() {
1003 let p = &present_rids[pi];
1004 if pj >= p.len() {
1005 pi += 1;
1006 pj = 0;
1007 continue;
1008 }
1009 let pv2 = p.value(pj);
1010 if pv2 < av {
1011 pj += 1;
1012 } else {
1013 break;
1014 }
1015 }
1016 let present_eq = if pi < present_rids.len() {
1017 let p = &present_rids[pi];
1018 if pj < p.len() {
1019 p.value(pj) == av
1020 } else {
1021 false
1022 }
1023 } else {
1024 false
1025 };
1026 if !present_eq {
1027 buf.push(av);
1028 }
1029 if present_eq {
1030 pj += 1;
1031 }
1032 aj += 1;
1033 if buf.len() >= 4096 {
1034 flush(&mut buf)?;
1035 }
1036 }
1037 ai += 1;
1038 aj = 0;
1039 }
1040 flush(&mut buf)?;
1041 Ok(())
1042 } else {
1043 res
1044 }
1045 }
1046 } else if opts.reverse {
1047 if paginate {
1048 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1049 visitor,
1050 opts.offset,
1051 opts.limit,
1052 true,
1053 );
1054 self.scan_sorted_visit_reverse(field_id, &mut pv)
1055 } else {
1056 self.scan_sorted_visit_reverse(field_id, visitor)
1057 }
1058 } else if paginate {
1059 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1060 visitor,
1061 opts.offset,
1062 opts.limit,
1063 false,
1064 );
1065 self.scan_sorted_visit(field_id, &mut pv)
1066 } else {
1067 self.scan_sorted_visit(field_id, visitor)
1068 }
1069 }
1070}