velesdb_core/collection/core/scroll.rs
1//! Scroll cursor for paginated iteration over collection points.
2//!
3//! Provides `ScrollBatch` and `Collection::scroll_batch` for deterministic,
4//! ascending-ID iteration with optional payload filtering.
5
6use crate::collection::types::Collection;
7use crate::error::{Error, Result};
8use crate::filter::Filter;
9use crate::point::Point;
10use crate::storage::{PayloadStorage, VectorStorage};
11
12/// Result of a single scroll batch operation.
13///
14/// Contains the points in this batch (ascending ID order) and the cursor
15/// position for resuming iteration.
16#[derive(Debug, Clone)]
17pub struct ScrollBatch {
18 /// Points in this batch, ordered by ascending ID.
19 pub points: Vec<Point>,
20 /// Cursor for the next batch (`None` if no more points).
21 /// This is the ID of the last point in this batch.
22 pub next_cursor: Option<u64>,
23}
24
25impl Collection {
26 /// Returns the next batch of points starting after `cursor`.
27 ///
28 /// - `cursor`: `None` to start from the beginning, `Some(id)` to resume
29 /// after the given point ID (exclusive).
30 /// - `batch_size`: Maximum number of points to return. Must be > 0.
31 /// - `filter`: Optional payload filter. Points not matching are skipped.
32 ///
33 /// Points are returned in ascending ID order for deterministic iteration.
34 ///
35 /// # Errors
36 ///
37 /// Returns `Error::Config` if `batch_size` is 0.
38 pub fn scroll_batch(
39 &self,
40 cursor: Option<u64>,
41 batch_size: usize,
42 filter: Option<&Filter>,
43 ) -> Result<ScrollBatch> {
44 if batch_size == 0 {
45 return Err(Error::Config(
46 "batch_size must be greater than 0".to_string(),
47 ));
48 }
49
50 // all_point_ids() returns IDs pre-sorted via BTreeSet (see crud_read_delete.rs).
51 // Binary search via partition_point is O(log N) per batch.
52 let ids = self.all_point_ids();
53
54 let start = match cursor {
55 Some(c) => ids.partition_point(|&id| id <= c),
56 None => 0,
57 };
58
59 let candidates = &ids[start..];
60 let points = self.collect_filtered_batch(candidates, batch_size, filter);
61
62 let next_cursor = points.last().map(|p| p.id);
63 Ok(ScrollBatch {
64 points,
65 next_cursor,
66 })
67 }
68
69 /// Collects up to `batch_size` points from `candidate_ids`, applying an optional filter.
70 fn collect_filtered_batch(
71 &self,
72 candidate_ids: &[u64],
73 batch_size: usize,
74 filter: Option<&Filter>,
75 ) -> Vec<Point> {
76 let config = self.config.read();
77 let is_metadata_only = config.metadata_only;
78 drop(config);
79
80 let payload_storage = self.payload_storage.read();
81 let vector_storage = self.vector_storage.read();
82
83 let mut points = Vec::with_capacity(batch_size);
84 for &id in candidate_ids {
85 if points.len() >= batch_size {
86 break;
87 }
88 if let Some(point) =
89 Self::build_point(id, is_metadata_only, &*payload_storage, &*vector_storage)
90 {
91 if Self::passes_filter(&point, filter) {
92 points.push(point);
93 }
94 }
95 }
96 points
97 }
98
99 /// Builds a `Point` from storage. Always returns `Some`; points without a
100 /// stored vector get an empty vector slice.
101 #[allow(clippy::unnecessary_wraps)] // Reason: Option return used by caller's if-let pattern
102 fn build_point(
103 id: u64,
104 is_metadata_only: bool,
105 payload_storage: &dyn PayloadStorage,
106 vector_storage: &dyn VectorStorage,
107 ) -> Option<Point> {
108 let payload = payload_storage.retrieve(id).ok().flatten();
109 // Graph nodes inserted via upsert_node_payload() have no vector in storage.
110 // Use unwrap_or_default() so payload-only nodes are included, not silently skipped.
111 let vector = if is_metadata_only {
112 Vec::new()
113 } else {
114 vector_storage
115 .retrieve(id)
116 .ok()
117 .flatten()
118 .unwrap_or_default()
119 };
120 Some(Point {
121 id,
122 vector,
123 payload,
124 sparse_vectors: None,
125 })
126 }
127
128 /// Returns `true` if the point passes the optional filter.
129 fn passes_filter(point: &Point, filter: Option<&Filter>) -> bool {
130 match (filter, &point.payload) {
131 (Some(f), Some(payload)) => f.matches(payload),
132 (Some(_), None) => false,
133 (None, _) => true,
134 }
135 }
136}