1use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14use std::ops::{Bound, RangeBounds};
15
16use arrow::array::*;
17use arrow::compute;
18
19use super::ColumnStore;
20use crate::serialization::deserialize_array;
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use llkv_result::{Error, Result};
23use llkv_storage::{
24 pager::{BatchGet, GetResult, Pager},
25 types::PhysicalKey,
26};
27use llkv_types::ids::{LogicalFieldId, LogicalStorageNamespace};
28use simd_r_drive_entry_handle::EntryHandle;
29
30pub mod builder;
31pub use builder::*;
32
33pub mod kmerge;
34pub use kmerge::*;
35
36pub mod options;
37pub use options::*;
38
39pub mod unsorted;
40pub use unsorted::*;
41
42pub mod ranges;
43pub use ranges::*;
44
45pub mod sorted;
46pub use sorted::*;
47
48pub mod visitors;
49pub use visitors::*;
50
51pub mod filter;
52pub use filter::*;
53
54impl<P> ColumnStore<P>
55where
56 P: Pager<Blob = EntryHandle>,
57{
58 fn scan_visit(
60 &self,
61 field_id: LogicalFieldId,
62 visitor: &mut dyn crate::store::scan::PrimitiveVisitor,
63 ) -> Result<()> {
64 let catalog = self.catalog.read().unwrap();
65 crate::store::scan::unsorted_visit(self.pager.as_ref(), &catalog.map, field_id, visitor)
66 }
67
68 fn scan_sorted_visit(
71 &self,
72 field_id: LogicalFieldId,
73 visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
74 ) -> Result<()> {
75 let catalog = self.catalog.read().unwrap();
76 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
77 let desc_blob = self
78 .pager
79 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
80 .pop()
81 .and_then(|r| match r {
82 GetResult::Raw { bytes, .. } => Some(bytes),
83 _ => None,
84 })
85 .ok_or(Error::NotFound)?;
86 let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
87 drop(catalog);
88
89 let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
90 for m in crate::store::descriptor::DescriptorIterator::new(
91 self.pager.as_ref(),
92 desc.head_page_pk,
93 ) {
94 let meta = m?;
95 if meta.row_count == 0 {
96 continue;
97 }
98 if meta.value_order_perm_pk == 0 {
99 return Err(Error::NotFound);
100 }
101 metas.push(meta);
102 }
103 if metas.is_empty() {
104 return Ok(());
105 }
106 let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
107 let first_any = deserialize_array(buffers.value_handle(0).clone())?;
108 with_integer_arrow_type!(
109 first_any.data_type().clone(),
110 |ArrowTy| {
111 <ArrowTy as sorted::SortedDispatch>::visit(
112 self.pager.as_ref(),
113 &metas,
114 &buffers,
115 visitor,
116 )
117 },
118 Err(Error::Internal("unsupported sorted dtype".into())),
119 )
120 }
121
122 fn scan_sorted_visit_reverse(
124 &self,
125 field_id: LogicalFieldId,
126 visitor: &mut dyn crate::store::scan::PrimitiveSortedVisitor,
127 ) -> Result<()> {
128 let catalog = self.catalog.read().unwrap();
129 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
130 let desc_blob = self
131 .pager
132 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
133 .pop()
134 .and_then(|r| match r {
135 GetResult::Raw { bytes, .. } => Some(bytes),
136 _ => None,
137 })
138 .ok_or(Error::NotFound)?;
139 let desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
140 drop(catalog);
141
142 let mut metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
143 for m in crate::store::descriptor::DescriptorIterator::new(
144 self.pager.as_ref(),
145 desc.head_page_pk,
146 ) {
147 let meta = m?;
148 if meta.row_count == 0 {
149 continue;
150 }
151 if meta.value_order_perm_pk == 0 {
152 return Err(Error::NotFound);
153 }
154 metas.push(meta);
155 }
156 if metas.is_empty() {
157 return Ok(());
158 }
159 let mut gets: Vec<BatchGet> = Vec::with_capacity(metas.len() * 2);
160 for m in &metas {
161 gets.push(BatchGet::Raw { key: m.chunk_pk });
162 gets.push(BatchGet::Raw {
163 key: m.value_order_perm_pk,
164 });
165 }
166 let buffers = sorted::load_sorted_buffers(self.pager.as_ref(), &metas)?;
167 let first_any = crate::serialization::deserialize_array(buffers.value_handle(0).clone())?;
168 with_integer_arrow_type!(
169 first_any.data_type().clone(),
170 |ArrowTy| {
171 <ArrowTy as sorted::SortedDispatch>::visit_rev(
172 self.pager.as_ref(),
173 &metas,
174 &buffers,
175 visitor,
176 )
177 },
178 Err(Error::Internal("unsupported sorted dtype".into())),
179 )
180 }
181
182 pub fn scan(
185 &self,
186 field_id: LogicalFieldId,
187 opts: ScanOptions,
188 visitor: &mut dyn crate::store::scan::PrimitiveFullVisitor,
189 ) -> Result<()> {
190 let paginate = opts.offset > 0 || opts.limit.is_some();
191 if !opts.sorted {
192 if opts.with_row_ids {
193 let row_fid = field_id.with_namespace(LogicalStorageNamespace::RowIdShadow);
194 let catalog = self.catalog.read().unwrap();
195 if opts.include_nulls {
196 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
197 if paginate {
198 let mut pv = crate::store::scan::PaginateVisitor::new(
199 visitor,
200 opts.offset,
201 opts.limit,
202 );
203 return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
204 self.pager.as_ref(),
205 &catalog.map,
206 field_id,
207 row_fid,
208 anchor_fid,
209 opts.nulls_first,
210 &mut pv,
211 );
212 } else {
213 return crate::store::scan::unsorted_with_row_ids_and_nulls_visit(
214 self.pager.as_ref(),
215 &catalog.map,
216 field_id,
217 row_fid,
218 anchor_fid,
219 opts.nulls_first,
220 visitor,
221 );
222 }
223 }
224 if paginate {
225 let mut pv =
226 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
227 return crate::store::scan::unsorted_with_row_ids_visit(
228 self.pager.as_ref(),
229 &catalog.map,
230 field_id,
231 row_fid,
232 &mut pv,
233 );
234 } else {
235 return crate::store::scan::unsorted_with_row_ids_visit(
236 self.pager.as_ref(),
237 &catalog.map,
238 field_id,
239 row_fid,
240 visitor,
241 );
242 }
243 }
244 if paginate {
245 let mut pv =
246 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
247 return self.scan_visit(field_id, &mut pv);
248 } else {
249 return self.scan_visit(field_id, visitor);
250 }
251 }
252
253 if opts.with_row_ids {
254 let row_fid = field_id.with_namespace(LogicalStorageNamespace::RowIdShadow);
255 let catalog = self.catalog.read().unwrap();
257 let descriptor_pk = *catalog.map.get(&field_id).ok_or(Error::NotFound)?;
258 let desc_blob = self
259 .pager
260 .batch_get(&[BatchGet::Raw { key: descriptor_pk }])?
261 .pop()
262 .and_then(|r| match r {
263 GetResult::Raw { bytes, .. } => Some(bytes),
264 _ => None,
265 })
266 .ok_or(Error::NotFound)?;
267 let desc =
268 crate::store::descriptor::ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
269
270 let rid_descriptor_pk = *catalog.map.get(&row_fid).ok_or(Error::NotFound)?;
271 let rid_desc_blob = self
272 .pager
273 .batch_get(&[BatchGet::Raw {
274 key: rid_descriptor_pk,
275 }])?
276 .pop()
277 .and_then(|r| match r {
278 GetResult::Raw { bytes, .. } => Some(bytes),
279 _ => None,
280 })
281 .ok_or(Error::NotFound)?;
282 let rid_desc =
283 crate::store::descriptor::ColumnDescriptor::from_le_bytes(rid_desc_blob.as_ref());
284 drop(catalog);
285
286 let mut metas_val: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
287 for m in crate::store::descriptor::DescriptorIterator::new(
288 self.pager.as_ref(),
289 desc.head_page_pk,
290 ) {
291 let meta = m?;
292 if meta.row_count == 0 {
293 continue;
294 }
295 if meta.value_order_perm_pk == 0 {
296 return Err(Error::NotFound);
297 }
298 metas_val.push(meta);
299 }
300 let mut metas_rid: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
301 for m in crate::store::descriptor::DescriptorIterator::new(
302 self.pager.as_ref(),
303 rid_desc.head_page_pk,
304 ) {
305 let meta = m?;
306 if meta.row_count == 0 {
307 continue;
308 }
309 metas_rid.push(meta);
310 }
311 if metas_val.is_empty() {
312 return Ok(());
313 }
314 if metas_val.len() != metas_rid.len() {
315 return Err(Error::Internal(
316 "sorted_with_row_ids: chunk count mismatch".into(),
317 ));
318 }
319
320 let buffers =
321 sorted::load_sorted_buffers_with_rids(self.pager.as_ref(), &metas_val, &metas_rid)?;
322 let first_any =
323 crate::serialization::deserialize_array(buffers.base().value_handle(0).clone())?;
324 if opts.reverse {
325 if paginate {
326 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
327 visitor,
328 opts.offset,
329 opts.limit,
330 true,
331 );
332 let vals_res = with_integer_arrow_type!(
333 first_any.data_type().clone(),
334 |ArrowTy| {
335 <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
336 self.pager.as_ref(),
337 &metas_val,
338 &metas_rid,
339 &buffers,
340 &mut pv,
341 )
342 },
343 Err(Error::Internal("unsupported sorted dtype".into())),
344 );
345 if opts.include_nulls {
346 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
347 let catalog = self.catalog.read().unwrap();
348 let anchor_desc_pk =
349 *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
350 let anchor_desc_blob = self
351 .pager
352 .batch_get(&[BatchGet::Raw {
353 key: anchor_desc_pk,
354 }])?
355 .pop()
356 .and_then(|r| match r {
357 GetResult::Raw { bytes, .. } => Some(bytes),
358 _ => None,
359 })
360 .ok_or(Error::NotFound)?;
361 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
362 anchor_desc_blob.as_ref(),
363 );
364 drop(catalog);
365 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
366 let mut get_keys: Vec<BatchGet> = Vec::new();
367 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> =
368 Vec::new();
369 for m in crate::store::descriptor::DescriptorIterator::new(
370 self.pager.as_ref(),
371 anchor_desc.head_page_pk,
372 ) {
373 let mm = m?;
374 if mm.row_count == 0 {
375 continue;
376 }
377 anchor_metas.push(mm);
378 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
379 }
380 if !anchor_metas.is_empty() {
381 let res = self.pager.batch_get(&get_keys)?;
382 for r in res {
383 if let GetResult::Raw { bytes, .. } = r {
384 let any = deserialize_array(bytes)?;
385 let arr = any
386 .as_any()
387 .downcast_ref::<UInt64Array>()
388 .ok_or_else(|| {
389 Error::Internal("anchor rid not u64".into())
390 })?
391 .clone();
392 anchor_rids.push(arr);
393 }
394 }
395 }
396 let mut present_rids: Vec<UInt64Array> = Vec::new();
397 for mr in &metas_rid {
398 let any = deserialize_array(
399 buffers
400 .rid_by_pk(mr.chunk_pk)
401 .ok_or(Error::NotFound)?
402 .clone(),
403 )?;
404 let arr = any
405 .as_any()
406 .downcast_ref::<UInt64Array>()
407 .ok_or_else(|| Error::Internal("rid not u64".into()))?
408 .clone();
409 present_rids.push(arr);
410 }
411 let mut ai = anchor_rids.len();
412 let mut aj: isize = -1; let mut pi = present_rids.len();
414 let mut pj: isize = -1;
415 let mut buf: Vec<u64> = Vec::new();
416 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
417 if v.is_empty() {
418 return Ok(());
419 }
420 let arr = UInt64Array::from(std::mem::take(v));
421 let len = arr.len();
422 pv.null_run(&arr, 0, len);
423 Ok(())
424 };
425 if opts.nulls_first {
427 if ai > 0 {
429 ai -= 1;
430 aj = anchor_rids[ai].len() as isize - 1;
431 }
432 if pi > 0 {
433 pi -= 1;
434 pj = present_rids[pi].len() as isize - 1;
435 }
436 while ai < anchor_rids.len() {
437 let a = &anchor_rids[ai];
439 while aj >= 0 {
440 let av = a.value(aj as usize);
441 while pi < present_rids.len() {
443 let p = &present_rids[pi];
444 if pj < 0 {
445 if pi == 0 {
446 break;
447 }
448 pi -= 1;
449 pj = present_rids[pi].len() as isize - 1;
450 continue;
451 }
452 let pv2 = p.value(pj as usize);
453 if pv2 > av {
454 pj -= 1;
455 } else {
456 break;
457 }
458 }
459 let present_eq = if pi < present_rids.len() {
460 let p = &present_rids[pi];
461 if pj >= 0 {
462 p.value(pj as usize) == av
463 } else {
464 false
465 }
466 } else {
467 false
468 };
469 if !present_eq {
470 buf.push(av);
471 }
472 if present_eq {
473 pj -= 1;
474 }
475 aj -= 1;
476 if buf.len() >= 4096 {
477 flush(&mut buf)?;
478 }
479 }
480 if ai == 0 {
481 break;
482 }
483 ai -= 1;
484 aj = anchor_rids[ai].len() as isize - 1;
485 }
486 flush(&mut buf)?;
487 vals_res
488 } else {
489 vals_res.and_then(|_| {
490 if ai > 0 {
491 ai -= 1;
492 aj = anchor_rids[ai].len() as isize - 1;
493 }
494 if pi > 0 {
495 pi -= 1;
496 pj = present_rids[pi].len() as isize - 1;
497 }
498 while ai < anchor_rids.len() {
499 let a = &anchor_rids[ai];
500 while aj >= 0 {
501 let av = a.value(aj as usize);
502 while pi < present_rids.len() {
503 let p = &present_rids[pi];
504 if pj < 0 {
505 if pi == 0 {
506 break;
507 }
508 pi -= 1;
509 pj = present_rids[pi].len() as isize - 1;
510 continue;
511 }
512 let pv2 = p.value(pj as usize);
513 if pv2 > av {
514 pj -= 1;
515 } else {
516 break;
517 }
518 }
519 let present_eq = if pi < present_rids.len() {
520 let p = &present_rids[pi];
521 if pj >= 0 {
522 p.value(pj as usize) == av
523 } else {
524 false
525 }
526 } else {
527 false
528 };
529 if !present_eq {
530 buf.push(av);
531 }
532 if present_eq {
533 pj -= 1;
534 }
535 aj -= 1;
536 if buf.len() >= 4096 {
537 flush(&mut buf)?;
538 }
539 }
540 if ai == 0 {
541 break;
542 }
543 ai -= 1;
544 aj = anchor_rids[ai].len() as isize - 1;
545 }
546 flush(&mut buf)?;
547 Ok(())
548 })
549 }
550 } else {
551 vals_res
552 }
553 } else {
554 with_integer_arrow_type!(
555 first_any.data_type().clone(),
556 |ArrowTy| {
557 <ArrowTy as sorted::SortedDispatch>::visit_with_rids_rev(
558 self.pager.as_ref(),
559 &metas_val,
560 &metas_rid,
561 &buffers,
562 visitor,
563 )
564 },
565 Err(Error::Internal("unsupported sorted dtype".into())),
566 )
567 }
568 } else if paginate {
569 let mut pv =
570 crate::store::scan::PaginateVisitor::new(visitor, opts.offset, opts.limit);
571 let vals_res = with_integer_arrow_type!(
572 first_any.data_type().clone(),
573 |ArrowTy| {
574 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
575 self.pager.as_ref(),
576 &metas_val,
577 &metas_rid,
578 &buffers,
579 &mut pv,
580 )
581 },
582 Err(Error::Internal("unsupported sorted dtype".into())),
583 );
584 if opts.include_nulls {
585 let anchor_fid = opts.anchor_row_id_field.unwrap_or(row_fid);
587 let catalog = self.catalog.read().unwrap();
589 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
590 let anchor_desc_blob = self
591 .pager
592 .batch_get(&[BatchGet::Raw {
593 key: anchor_desc_pk,
594 }])?
595 .pop()
596 .and_then(|r| match r {
597 GetResult::Raw { bytes, .. } => Some(bytes),
598 _ => None,
599 })
600 .ok_or(Error::NotFound)?;
601 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
602 anchor_desc_blob.as_ref(),
603 );
604 drop(catalog);
605 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
606 let mut get_keys: Vec<BatchGet> = Vec::new();
607 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
608 for m in crate::store::descriptor::DescriptorIterator::new(
609 self.pager.as_ref(),
610 anchor_desc.head_page_pk,
611 ) {
612 let mm = m?;
613 if mm.row_count == 0 {
614 continue;
615 }
616 anchor_metas.push(mm);
617 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
618 }
619 if !anchor_metas.is_empty() {
620 let res = self.pager.batch_get(&get_keys)?;
621 for r in res {
622 if let GetResult::Raw { bytes, .. } = r {
623 let any = deserialize_array(bytes)?;
624 let arr = any
625 .as_any()
626 .downcast_ref::<UInt64Array>()
627 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
628 .clone();
629 anchor_rids.push(arr);
630 }
631 }
632 }
633 let mut present_rids: Vec<UInt64Array> = Vec::new();
635 for mr in &metas_rid {
636 let any = deserialize_array(
637 buffers
638 .rid_by_pk(mr.chunk_pk)
639 .ok_or(Error::NotFound)?
640 .clone(),
641 )?;
642 let arr = any
643 .as_any()
644 .downcast_ref::<UInt64Array>()
645 .ok_or_else(|| Error::Internal("rid not u64".into()))?
646 .clone();
647 present_rids.push(arr);
648 }
649 let mut ai = 0usize;
651 let mut aj = 0usize; let mut pi = 0usize;
653 let mut pj = 0usize; let mut buf: Vec<u64> = Vec::new();
655 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
656 if v.is_empty() {
657 return Ok(());
658 }
659 let arr = UInt64Array::from(std::mem::take(v));
660 let len = arr.len();
661 pv.null_run(&arr, 0, len);
662 Ok(())
663 };
664 if opts.nulls_first {
666 while ai < anchor_rids.len() {
668 let a = &anchor_rids[ai];
669 while aj < a.len() {
670 let av = a.value(aj);
671 while pi < present_rids.len() {
673 let p = &present_rids[pi];
674 if pj >= p.len() {
675 pi += 1;
676 pj = 0;
677 continue;
678 }
679 let pv2 = p.value(pj);
680 if pv2 < av {
681 pj += 1;
682 } else {
683 break;
684 }
685 }
686 let present_eq = if pi < present_rids.len() {
687 let p = &present_rids[pi];
688 if pj < p.len() {
689 p.value(pj) == av
690 } else {
691 false
692 }
693 } else {
694 false
695 };
696 if !present_eq {
697 buf.push(av);
698 }
699 if present_eq {
700 pj += 1;
701 }
702 aj += 1;
703 if buf.len() >= 4096 {
704 flush(&mut buf)?;
705 }
706 }
707 ai += 1;
708 aj = 0;
709 }
710 flush(&mut buf)?;
711 vals_res
712 } else {
713 vals_res.and_then(|_| {
714 while ai < anchor_rids.len() {
715 let a = &anchor_rids[ai];
716 while aj < a.len() {
717 let av = a.value(aj);
718 while pi < present_rids.len() {
719 let p = &present_rids[pi];
720 if pj >= p.len() {
721 pi += 1;
722 pj = 0;
723 continue;
724 }
725 let pv2 = p.value(pj);
726 if pv2 < av {
727 pj += 1;
728 } else {
729 break;
730 }
731 }
732 let present_eq = if pi < present_rids.len() {
733 let p = &present_rids[pi];
734 if pj < p.len() {
735 p.value(pj) == av
736 } else {
737 false
738 }
739 } else {
740 false
741 };
742 if !present_eq {
743 buf.push(av);
744 }
745 if present_eq {
746 pj += 1;
747 }
748 aj += 1;
749 if buf.len() >= 4096 {
750 flush(&mut buf)?;
751 }
752 }
753 ai += 1;
754 aj = 0;
755 }
756 flush(&mut buf)?;
757 Ok(())
758 })
759 }
760 } else {
761 vals_res
762 }
763 } else if opts.include_nulls && opts.nulls_first {
764 let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
765 Error::Internal("anchor_row_id_field required when include_nulls=true".into())
766 })?;
767 let catalog = self.catalog.read().unwrap();
768 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
769 let anchor_desc_blob = self
770 .pager
771 .batch_get(&[BatchGet::Raw {
772 key: anchor_desc_pk,
773 }])?
774 .pop()
775 .and_then(|r| match r {
776 GetResult::Raw { bytes, .. } => Some(bytes),
777 _ => None,
778 })
779 .ok_or(Error::NotFound)?;
780 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
781 anchor_desc_blob.as_ref(),
782 );
783 drop(catalog);
784 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
785 let mut get_keys: Vec<BatchGet> = Vec::new();
786 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
787 for m in crate::store::descriptor::DescriptorIterator::new(
788 self.pager.as_ref(),
789 anchor_desc.head_page_pk,
790 ) {
791 let mm = m?;
792 if mm.row_count == 0 {
793 continue;
794 }
795 anchor_metas.push(mm);
796 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
797 }
798 if !anchor_metas.is_empty() {
799 let res = self.pager.batch_get(&get_keys)?;
800 for r in res {
801 if let GetResult::Raw { bytes, .. } = r {
802 let any = deserialize_array(bytes)?;
803 let arr = any
804 .as_any()
805 .downcast_ref::<UInt64Array>()
806 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
807 .clone();
808 anchor_rids.push(arr);
809 }
810 }
811 }
812 let mut present_rids: Vec<UInt64Array> = Vec::new();
813 for mr in &metas_rid {
814 let any = deserialize_array(
815 buffers
816 .rid_by_pk(mr.chunk_pk)
817 .ok_or(Error::NotFound)?
818 .clone(),
819 )?;
820 let arr = any
821 .as_any()
822 .downcast_ref::<UInt64Array>()
823 .ok_or_else(|| Error::Internal("rid not u64".into()))?
824 .clone();
825 present_rids.push(arr);
826 }
827 let mut ai = 0usize;
829 let mut aj = 0usize;
830 let mut pi = 0usize;
831 let mut pj = 0usize;
832 let mut buf: Vec<u64> = Vec::new();
833 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
834 if v.is_empty() {
835 return Ok(());
836 }
837 let arr = UInt64Array::from(std::mem::take(v));
838 let len = arr.len();
839 visitor.null_run(&arr, 0, len);
840 Ok(())
841 };
842 while ai < anchor_rids.len() {
843 let a = &anchor_rids[ai];
844 while aj < a.len() {
845 let av = a.value(aj);
846 while pi < present_rids.len() {
847 let p = &present_rids[pi];
848 if pj >= p.len() {
849 pi += 1;
850 pj = 0;
851 continue;
852 }
853 let pv2 = p.value(pj);
854 if pv2 < av {
855 pj += 1;
856 } else {
857 break;
858 }
859 }
860 let present_eq = if pi < present_rids.len() {
861 let p = &present_rids[pi];
862 if pj < p.len() {
863 p.value(pj) == av
864 } else {
865 false
866 }
867 } else {
868 false
869 };
870 if !present_eq {
871 buf.push(av);
872 }
873 if present_eq {
874 pj += 1;
875 }
876 aj += 1;
877 if buf.len() >= 4096 {
878 flush(&mut buf)?;
879 }
880 }
881 ai += 1;
882 aj = 0;
883 }
884 flush(&mut buf)?;
885 with_integer_arrow_type!(
887 first_any.data_type().clone(),
888 |ArrowTy| {
889 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
890 self.pager.as_ref(),
891 &metas_val,
892 &metas_rid,
893 &buffers,
894 visitor,
895 )
896 },
897 Err(Error::Internal("unsupported sorted dtype".into())),
898 )
899 } else {
900 let res = with_integer_arrow_type!(
902 first_any.data_type().clone(),
903 |ArrowTy| {
904 <ArrowTy as sorted::SortedDispatch>::visit_with_rids(
905 self.pager.as_ref(),
906 &metas_val,
907 &metas_rid,
908 &buffers,
909 visitor,
910 )
911 },
912 Err(Error::Internal("unsupported sorted dtype".into())),
913 );
914 if opts.include_nulls {
915 res?;
916 let anchor_fid = opts.anchor_row_id_field.ok_or_else(|| {
917 Error::Internal(
918 "anchor_row_id_field required when include_nulls=true".into(),
919 )
920 })?;
921 let catalog = self.catalog.read().unwrap();
922 let anchor_desc_pk = *catalog.map.get(&anchor_fid).ok_or(Error::NotFound)?;
923 let anchor_desc_blob = self
924 .pager
925 .batch_get(&[BatchGet::Raw {
926 key: anchor_desc_pk,
927 }])?
928 .pop()
929 .and_then(|r| match r {
930 GetResult::Raw { bytes, .. } => Some(bytes),
931 _ => None,
932 })
933 .ok_or(Error::NotFound)?;
934 let anchor_desc = crate::store::descriptor::ColumnDescriptor::from_le_bytes(
935 anchor_desc_blob.as_ref(),
936 );
937 drop(catalog);
938 let mut anchor_rids: Vec<UInt64Array> = Vec::new();
939 let mut get_keys: Vec<BatchGet> = Vec::new();
940 let mut anchor_metas: Vec<crate::store::descriptor::ChunkMetadata> = Vec::new();
941 for m in crate::store::descriptor::DescriptorIterator::new(
942 self.pager.as_ref(),
943 anchor_desc.head_page_pk,
944 ) {
945 let mm = m?;
946 if mm.row_count == 0 {
947 continue;
948 }
949 anchor_metas.push(mm);
950 get_keys.push(BatchGet::Raw { key: mm.chunk_pk });
951 }
952 if !anchor_metas.is_empty() {
953 let res_get = self.pager.batch_get(&get_keys)?;
954 for r in res_get {
955 if let GetResult::Raw { bytes, .. } = r {
956 let any = deserialize_array(bytes)?;
957 let arr = any
958 .as_any()
959 .downcast_ref::<UInt64Array>()
960 .ok_or_else(|| Error::Internal("anchor rid not u64".into()))?
961 .clone();
962 anchor_rids.push(arr);
963 }
964 }
965 }
966 let mut present_rids: Vec<UInt64Array> = Vec::new();
967 for mr in &metas_rid {
968 let any = deserialize_array(
969 buffers
970 .rid_by_pk(mr.chunk_pk)
971 .ok_or(Error::NotFound)?
972 .clone(),
973 )?;
974 let arr = any
975 .as_any()
976 .downcast_ref::<UInt64Array>()
977 .ok_or_else(|| Error::Internal("rid not u64".into()))?
978 .clone();
979 present_rids.push(arr);
980 }
981 let mut ai = 0usize;
982 let mut aj = 0usize;
983 let mut pi = 0usize;
984 let mut pj = 0usize;
985 let mut buf: Vec<u64> = Vec::new();
986 let mut flush = |v: &mut Vec<u64>| -> Result<()> {
987 if v.is_empty() {
988 return Ok(());
989 }
990 let arr = UInt64Array::from(std::mem::take(v));
991 let len = arr.len();
992 visitor.null_run(&arr, 0, len);
993 Ok(())
994 };
995 while ai < anchor_rids.len() {
996 let a = &anchor_rids[ai];
997 while aj < a.len() {
998 let av = a.value(aj);
999 while pi < present_rids.len() {
1000 let p = &present_rids[pi];
1001 if pj >= p.len() {
1002 pi += 1;
1003 pj = 0;
1004 continue;
1005 }
1006 let pv2 = p.value(pj);
1007 if pv2 < av {
1008 pj += 1;
1009 } else {
1010 break;
1011 }
1012 }
1013 let present_eq = if pi < present_rids.len() {
1014 let p = &present_rids[pi];
1015 if pj < p.len() {
1016 p.value(pj) == av
1017 } else {
1018 false
1019 }
1020 } else {
1021 false
1022 };
1023 if !present_eq {
1024 buf.push(av);
1025 }
1026 if present_eq {
1027 pj += 1;
1028 }
1029 aj += 1;
1030 if buf.len() >= 4096 {
1031 flush(&mut buf)?;
1032 }
1033 }
1034 ai += 1;
1035 aj = 0;
1036 }
1037 flush(&mut buf)?;
1038 Ok(())
1039 } else {
1040 res
1041 }
1042 }
1043 } else if opts.reverse {
1044 if paginate {
1045 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1046 visitor,
1047 opts.offset,
1048 opts.limit,
1049 true,
1050 );
1051 self.scan_sorted_visit_reverse(field_id, &mut pv)
1052 } else {
1053 self.scan_sorted_visit_reverse(field_id, visitor)
1054 }
1055 } else if paginate {
1056 let mut pv = crate::store::scan::PaginateVisitor::new_with_reverse(
1057 visitor,
1058 opts.offset,
1059 opts.limit,
1060 false,
1061 );
1062 self.scan_sorted_visit(field_id, &mut pv)
1063 } else {
1064 self.scan_sorted_visit(field_id, visitor)
1065 }
1066 }
1067}