kstone_core/
iterator.rs

1/// Iterator support for Query and Scan operations
2///
3/// Provides efficient iteration over memtable and SST files within a stripe,
4/// merging results with proper ordering (newest version wins).
5
6use crate::{Key, Item};
7use bytes::Bytes;
8
9/// Sort key comparison operator
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SortKeyCondition {
12    /// sk = value
13    Equal,
14    /// sk < value
15    LessThan,
16    /// sk <= value
17    LessThanOrEqual,
18    /// sk > value
19    GreaterThan,
20    /// sk >= value
21    GreaterThanOrEqual,
22    /// sk BETWEEN value1 AND value2
23    Between,
24    /// sk begins_with value
25    BeginsWith,
26}
27
28/// Query parameters for stripe iteration
29#[derive(Debug, Clone)]
30pub struct QueryParams {
31    /// Partition key (required)
32    pub pk: Bytes,
33    /// Sort key condition (optional - if None, return all items with PK)
34    pub sk_condition: Option<(SortKeyCondition, Bytes, Option<Bytes>)>,
35    /// Scan direction
36    pub forward: bool,
37    /// Maximum items to return
38    pub limit: Option<usize>,
39    /// Start key for pagination (exclusive)
40    pub start_key: Option<Key>,
41    /// Index name for LSI queries (Phase 3.1+)
42    pub index_name: Option<String>,
43}
44
45impl QueryParams {
46    /// Create a new query for a partition key
47    pub fn new(pk: Bytes) -> Self {
48        Self {
49            pk,
50            sk_condition: None,
51            forward: true,
52            limit: None,
53            start_key: None,
54            index_name: None,
55        }
56    }
57
58    /// Add sort key condition
59    pub fn with_sk_condition(
60        mut self,
61        condition: SortKeyCondition,
62        value: Bytes,
63        value2: Option<Bytes>,
64    ) -> Self {
65        self.sk_condition = Some((condition, value, value2));
66        self
67    }
68
69    /// Set scan direction
70    pub fn with_direction(mut self, forward: bool) -> Self {
71        self.forward = forward;
72        self
73    }
74
75    /// Set limit
76    pub fn with_limit(mut self, limit: usize) -> Self {
77        self.limit = Some(limit);
78        self
79    }
80
81    /// Set start key for pagination
82    pub fn with_start_key(mut self, key: Key) -> Self {
83        self.start_key = Some(key);
84        self
85    }
86
87    /// Set index name for LSI query (Phase 3.1+)
88    pub fn with_index_name(mut self, index_name: impl Into<String>) -> Self {
89        self.index_name = Some(index_name.into());
90        self
91    }
92
93    /// Check if a sort key matches the condition
94    pub fn matches_sk(&self, sk: &Option<Bytes>) -> bool {
95        match &self.sk_condition {
96            None => true, // No condition - accept all
97            Some((condition, value, value2)) => {
98                let sk_bytes = match sk {
99                    Some(b) => b,
100                    None => return false, // Has condition but item has no SK
101                };
102
103                match condition {
104                    SortKeyCondition::Equal => sk_bytes == value,
105                    SortKeyCondition::LessThan => sk_bytes < value,
106                    SortKeyCondition::LessThanOrEqual => sk_bytes <= value,
107                    SortKeyCondition::GreaterThan => sk_bytes > value,
108                    SortKeyCondition::GreaterThanOrEqual => sk_bytes >= value,
109                    SortKeyCondition::Between => {
110                        if let Some(v2) = value2 {
111                            sk_bytes >= value && sk_bytes <= v2
112                        } else {
113                            false
114                        }
115                    }
116                    SortKeyCondition::BeginsWith => sk_bytes.starts_with(value.as_ref()),
117                }
118            }
119        }
120    }
121
122    /// Check if we should skip a key based on pagination start_key
123    pub fn should_skip(&self, key: &Key) -> bool {
124        if let Some(start) = &self.start_key {
125            if self.forward {
126                // Forward: skip if key <= start_key
127                key <= start
128            } else {
129                // Backward: skip if key >= start_key
130                key >= start
131            }
132        } else {
133            false
134        }
135    }
136}
137
138/// Query result with pagination support
139#[derive(Debug, Clone)]
140pub struct QueryResult {
141    /// Items found
142    pub items: Vec<Item>,
143    /// Last evaluated key (for pagination)
144    pub last_key: Option<Key>,
145    /// Count of items examined (before filter)
146    pub scanned_count: usize,
147}
148
149impl QueryResult {
150    pub fn new(items: Vec<Item>, last_key: Option<Key>, scanned_count: usize) -> Self {
151        Self {
152            items,
153            last_key,
154            scanned_count,
155        }
156    }
157}
158
159/// Scan parameters for table/stripe scanning
160#[derive(Debug, Clone)]
161pub struct ScanParams {
162    /// Maximum items to return
163    pub limit: Option<usize>,
164    /// Start key for pagination (exclusive)
165    pub start_key: Option<Key>,
166    /// Segment number (for parallel scans, 0-based)
167    pub segment: Option<usize>,
168    /// Total number of segments (for parallel scans)
169    pub total_segments: Option<usize>,
170}
171
172impl ScanParams {
173    /// Create new scan parameters
174    pub fn new() -> Self {
175        Self {
176            limit: None,
177            start_key: None,
178            segment: None,
179            total_segments: None,
180        }
181    }
182
183    /// Set limit
184    pub fn with_limit(mut self, limit: usize) -> Self {
185        self.limit = Some(limit);
186        self
187    }
188
189    /// Set start key for pagination
190    pub fn with_start_key(mut self, key: Key) -> Self {
191        self.start_key = Some(key);
192        self
193    }
194
195    /// Set parallel scan parameters
196    pub fn with_segment(mut self, segment: usize, total_segments: usize) -> Self {
197        self.segment = Some(segment);
198        self.total_segments = Some(total_segments);
199        self
200    }
201
202    /// Check if a stripe should be scanned by this segment
203    pub fn should_scan_stripe(&self, stripe_id: usize) -> bool {
204        match (self.segment, self.total_segments) {
205            (Some(seg), Some(total)) => {
206                // Distribute stripes across segments
207                stripe_id % total == seg
208            }
209            _ => true, // No parallel scan - scan all stripes
210        }
211    }
212
213    /// Check if we should skip a key based on pagination start_key
214    pub fn should_skip(&self, key: &Key) -> bool {
215        if let Some(start) = &self.start_key {
216            // Skip if key <= start_key
217            key <= start
218        } else {
219            false
220        }
221    }
222}
223
224impl Default for ScanParams {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// Scan result (same structure as QueryResult)
231pub type ScanResult = QueryResult;
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_query_params_sk_equal() {
239        let params = QueryParams::new(Bytes::from("pk1"))
240            .with_sk_condition(SortKeyCondition::Equal, Bytes::from("sk1"), None);
241
242        assert!(params.matches_sk(&Some(Bytes::from("sk1"))));
243        assert!(!params.matches_sk(&Some(Bytes::from("sk2"))));
244        assert!(!params.matches_sk(&None));
245    }
246
247    #[test]
248    fn test_query_params_sk_less_than() {
249        let params = QueryParams::new(Bytes::from("pk1"))
250            .with_sk_condition(SortKeyCondition::LessThan, Bytes::from("sk5"), None);
251
252        assert!(params.matches_sk(&Some(Bytes::from("sk1"))));
253        assert!(params.matches_sk(&Some(Bytes::from("sk4"))));
254        assert!(!params.matches_sk(&Some(Bytes::from("sk5"))));
255        assert!(!params.matches_sk(&Some(Bytes::from("sk6"))));
256    }
257
258    #[test]
259    fn test_query_params_sk_between() {
260        let params = QueryParams::new(Bytes::from("pk1")).with_sk_condition(
261            SortKeyCondition::Between,
262            Bytes::from("sk2"),
263            Some(Bytes::from("sk5")),
264        );
265
266        assert!(!params.matches_sk(&Some(Bytes::from("sk1"))));
267        assert!(params.matches_sk(&Some(Bytes::from("sk2"))));
268        assert!(params.matches_sk(&Some(Bytes::from("sk3"))));
269        assert!(params.matches_sk(&Some(Bytes::from("sk5"))));
270        assert!(!params.matches_sk(&Some(Bytes::from("sk6"))));
271    }
272
273    #[test]
274    fn test_query_params_sk_begins_with() {
275        let params = QueryParams::new(Bytes::from("pk1"))
276            .with_sk_condition(SortKeyCondition::BeginsWith, Bytes::from("user#"), None);
277
278        assert!(params.matches_sk(&Some(Bytes::from("user#123"))));
279        assert!(params.matches_sk(&Some(Bytes::from("user#456"))));
280        assert!(!params.matches_sk(&Some(Bytes::from("post#123"))));
281        assert!(!params.matches_sk(&Some(Bytes::from("user"))));
282    }
283
284    #[test]
285    fn test_query_params_no_condition() {
286        let params = QueryParams::new(Bytes::from("pk1"));
287
288        assert!(params.matches_sk(&Some(Bytes::from("sk1"))));
289        assert!(params.matches_sk(&Some(Bytes::from("anything"))));
290        assert!(params.matches_sk(&None));
291    }
292
293    #[test]
294    fn test_query_params_pagination_skip() {
295        let params = QueryParams::new(Bytes::from("pk1"))
296            .with_start_key(Key::with_sk(b"pk1".to_vec(), b"sk3".to_vec()))
297            .with_direction(true); // forward
298
299        let key1 = Key::with_sk(b"pk1".to_vec(), b"sk1".to_vec());
300        let key2 = Key::with_sk(b"pk1".to_vec(), b"sk3".to_vec());
301        let key3 = Key::with_sk(b"pk1".to_vec(), b"sk5".to_vec());
302
303        assert!(params.should_skip(&key1)); // sk1 < sk3
304        assert!(params.should_skip(&key2)); // sk3 == sk3
305        assert!(!params.should_skip(&key3)); // sk5 > sk3
306    }
307}