Skip to main content

velesdb_core/collection/core/
scroll.rs

1//! Scroll cursor for paginated iteration over collection points.
2//!
3//! Provides `ScrollBatch` and `Collection::scroll_batch` for deterministic,
4//! ascending-ID iteration with optional payload filtering.
5
6use crate::collection::types::Collection;
7use crate::error::{Error, Result};
8use crate::filter::Filter;
9use crate::point::Point;
10use crate::storage::{PayloadStorage, VectorStorage};
11
12/// Result of a single scroll batch operation.
13///
14/// Contains the points in this batch (ascending ID order) and the cursor
15/// position for resuming iteration.
16#[derive(Debug, Clone)]
17pub struct ScrollBatch {
18    /// Points in this batch, ordered by ascending ID.
19    pub points: Vec<Point>,
20    /// Cursor for the next batch (`None` if no more points).
21    /// This is the ID of the last point in this batch.
22    pub next_cursor: Option<u64>,
23}
24
25impl Collection {
26    /// Returns the next batch of points starting after `cursor`.
27    ///
28    /// - `cursor`: `None` to start from the beginning, `Some(id)` to resume
29    ///   after the given point ID (exclusive).
30    /// - `batch_size`: Maximum number of points to return. Must be > 0.
31    /// - `filter`: Optional payload filter. Points not matching are skipped.
32    ///
33    /// Points are returned in ascending ID order for deterministic iteration.
34    ///
35    /// # Errors
36    ///
37    /// Returns `Error::Config` if `batch_size` is 0.
38    pub fn scroll_batch(
39        &self,
40        cursor: Option<u64>,
41        batch_size: usize,
42        filter: Option<&Filter>,
43    ) -> Result<ScrollBatch> {
44        if batch_size == 0 {
45            return Err(Error::Config(
46                "batch_size must be greater than 0".to_string(),
47            ));
48        }
49
50        // all_point_ids() returns IDs pre-sorted via BTreeSet (see crud_read_delete.rs).
51        // Binary search via partition_point is O(log N) per batch.
52        let ids = self.all_point_ids();
53
54        let start = match cursor {
55            Some(c) => ids.partition_point(|&id| id <= c),
56            None => 0,
57        };
58
59        let candidates = &ids[start..];
60        let points = self.collect_filtered_batch(candidates, batch_size, filter);
61
62        let next_cursor = points.last().map(|p| p.id);
63        Ok(ScrollBatch {
64            points,
65            next_cursor,
66        })
67    }
68
69    /// Collects up to `batch_size` points from `candidate_ids`, applying an optional filter.
70    fn collect_filtered_batch(
71        &self,
72        candidate_ids: &[u64],
73        batch_size: usize,
74        filter: Option<&Filter>,
75    ) -> Vec<Point> {
76        let config = self.config.read();
77        let is_metadata_only = config.metadata_only;
78        drop(config);
79
80        let payload_storage = self.payload_storage.read();
81        let vector_storage = self.vector_storage.read();
82
83        let mut points = Vec::with_capacity(batch_size);
84        for &id in candidate_ids {
85            if points.len() >= batch_size {
86                break;
87            }
88            if let Some(point) =
89                Self::build_point(id, is_metadata_only, &*payload_storage, &*vector_storage)
90            {
91                if Self::passes_filter(&point, filter) {
92                    points.push(point);
93                }
94            }
95        }
96        points
97    }
98
99    /// Builds a `Point` from storage. Always returns `Some`; points without a
100    /// stored vector get an empty vector slice.
101    #[allow(clippy::unnecessary_wraps)] // Reason: Option return used by caller's if-let pattern
102    fn build_point(
103        id: u64,
104        is_metadata_only: bool,
105        payload_storage: &dyn PayloadStorage,
106        vector_storage: &dyn VectorStorage,
107    ) -> Option<Point> {
108        let payload = payload_storage.retrieve(id).ok().flatten();
109        // Graph nodes inserted via upsert_node_payload() have no vector in storage.
110        // Use unwrap_or_default() so payload-only nodes are included, not silently skipped.
111        let vector = if is_metadata_only {
112            Vec::new()
113        } else {
114            vector_storage
115                .retrieve(id)
116                .ok()
117                .flatten()
118                .unwrap_or_default()
119        };
120        Some(Point {
121            id,
122            vector,
123            payload,
124            sparse_vectors: None,
125        })
126    }
127
128    /// Returns `true` if the point passes the optional filter.
129    fn passes_filter(point: &Point, filter: Option<&Filter>) -> bool {
130        match (filter, &point.payload) {
131            (Some(f), Some(payload)) => f.matches(payload),
132            (Some(_), None) => false,
133            (None, _) => true,
134        }
135    }
136}