kstone_core/
index.rs

1/// Index support for LSI (Local Secondary Index) and GSI (Global Secondary Index)
2///
3/// Phase 3.1: LSI - alternative sort key on same partition key
4/// Phase 3.2: GSI - alternative partition key and sort key
5
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9/// Index projection type - which attributes to include in index
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub enum IndexProjection {
12    /// Project all attributes (default)
13    All,
14    /// Project only key attributes
15    KeysOnly,
16    /// Project specific attributes
17    Include(Vec<String>),
18}
19
20impl Default for IndexProjection {
21    fn default() -> Self {
22        IndexProjection::All
23    }
24}
25
26/// Local Secondary Index definition
27///
28/// LSI shares the same partition key as the base table but uses
29/// a different attribute value as the sort key.
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub struct LocalSecondaryIndex {
32    /// Index name (unique per table)
33    pub name: String,
34    /// Attribute name to use as alternative sort key
35    pub sort_key_attribute: String,
36    /// Which attributes to project into the index
37    pub projection: IndexProjection,
38}
39
40impl LocalSecondaryIndex {
41    /// Create a new LSI with all attributes projected
42    pub fn new(name: impl Into<String>, sort_key_attribute: impl Into<String>) -> Self {
43        Self {
44            name: name.into(),
45            sort_key_attribute: sort_key_attribute.into(),
46            projection: IndexProjection::All,
47        }
48    }
49
50    /// Set projection to keys only
51    pub fn keys_only(mut self) -> Self {
52        self.projection = IndexProjection::KeysOnly;
53        self
54    }
55
56    /// Set projection to include specific attributes
57    pub fn include(mut self, attributes: Vec<String>) -> Self {
58        self.projection = IndexProjection::Include(attributes);
59        self
60    }
61}
62
63/// Global Secondary Index definition (Phase 3.2+)
64///
65/// GSI can use different partition key and sort key from the base table.
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct GlobalSecondaryIndex {
68    /// Index name (unique per table)
69    pub name: String,
70    /// Attribute name to use as partition key
71    pub partition_key_attribute: String,
72    /// Optional attribute name to use as sort key
73    pub sort_key_attribute: Option<String>,
74    /// Which attributes to project into the index
75    pub projection: IndexProjection,
76}
77
78impl GlobalSecondaryIndex {
79    /// Create a new GSI with partition key only
80    pub fn new(name: impl Into<String>, partition_key_attribute: impl Into<String>) -> Self {
81        Self {
82            name: name.into(),
83            partition_key_attribute: partition_key_attribute.into(),
84            sort_key_attribute: None,
85            projection: IndexProjection::All,
86        }
87    }
88
89    /// Create a new GSI with partition key and sort key
90    pub fn with_sort_key(
91        name: impl Into<String>,
92        partition_key_attribute: impl Into<String>,
93        sort_key_attribute: impl Into<String>,
94    ) -> Self {
95        Self {
96            name: name.into(),
97            partition_key_attribute: partition_key_attribute.into(),
98            sort_key_attribute: Some(sort_key_attribute.into()),
99            projection: IndexProjection::All,
100        }
101    }
102
103    /// Set projection to keys only
104    pub fn keys_only(mut self) -> Self {
105        self.projection = IndexProjection::KeysOnly;
106        self
107    }
108
109    /// Set projection to include specific attributes
110    pub fn include(mut self, attributes: Vec<String>) -> Self {
111        self.projection = IndexProjection::Include(attributes);
112        self
113    }
114}
115
116/// Table schema with index definitions
117#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct TableSchema {
119    /// Local secondary indexes
120    pub local_indexes: Vec<LocalSecondaryIndex>,
121    /// Global secondary indexes (Phase 3.2+)
122    pub global_indexes: Vec<GlobalSecondaryIndex>,
123    /// TTL attribute name (Phase 3.3+)
124    /// When set, items with this attribute containing a timestamp in the past are considered expired
125    pub ttl_attribute_name: Option<String>,
126    /// Stream configuration (Phase 3.4+)
127    #[serde(default)]
128    pub stream_config: crate::stream::StreamConfig,
129}
130
131impl TableSchema {
132    /// Create an empty schema
133    pub fn new() -> Self {
134        Self::default()
135    }
136
137    /// Add a local secondary index
138    pub fn add_local_index(mut self, index: LocalSecondaryIndex) -> Self {
139        self.local_indexes.push(index);
140        self
141    }
142
143    /// Get LSI by name
144    pub fn get_local_index(&self, name: &str) -> Option<&LocalSecondaryIndex> {
145        self.local_indexes.iter().find(|idx| idx.name == name)
146    }
147
148    /// Add a global secondary index (Phase 3.2+)
149    pub fn add_global_index(mut self, index: GlobalSecondaryIndex) -> Self {
150        self.global_indexes.push(index);
151        self
152    }
153
154    /// Get GSI by name (Phase 3.2+)
155    pub fn get_global_index(&self, name: &str) -> Option<&GlobalSecondaryIndex> {
156        self.global_indexes.iter().find(|idx| idx.name == name)
157    }
158
159    /// Enable TTL (Time To Live) with the specified attribute name (Phase 3.3+)
160    ///
161    /// Items with this attribute containing a Unix timestamp (seconds since epoch)
162    /// in the past will be considered expired and automatically deleted.
163    pub fn with_ttl(mut self, attribute_name: impl Into<String>) -> Self {
164        self.ttl_attribute_name = Some(attribute_name.into());
165        self
166    }
167
168    /// Enable streams (Change Data Capture) with the specified configuration (Phase 3.4+)
169    ///
170    /// Streams capture all item-level modifications (INSERT, MODIFY, REMOVE) and
171    /// make them available for processing.
172    pub fn with_stream(mut self, config: crate::stream::StreamConfig) -> Self {
173        self.stream_config = config;
174        self
175    }
176
177    /// Check if an item is expired based on TTL (Phase 3.3+)
178    ///
179    /// Returns true if:
180    /// - TTL is enabled AND
181    /// - Item has the TTL attribute AND
182    /// - The TTL timestamp is in the past
183    pub fn is_expired(&self, item: &crate::Item) -> bool {
184        use crate::Value;
185
186        if let Some(ttl_attr) = &self.ttl_attribute_name {
187            if let Some(ttl_value) = item.get(ttl_attr) {
188                // Get current time in seconds since epoch
189                let now = std::time::SystemTime::now()
190                    .duration_since(std::time::UNIX_EPOCH)
191                    .unwrap()
192                    .as_secs() as i64;
193
194                // Extract expiration timestamp from item
195                let expires_at = match ttl_value {
196                    Value::N(n) => n.parse::<i64>().ok(),
197                    Value::Ts(ts) => Some(ts / 1000), // Convert millis to seconds
198                    _ => None,
199                };
200
201                if let Some(expires) = expires_at {
202                    return now > expires;
203                }
204            }
205        }
206
207        false
208    }
209}
210
211/// Encode an index key for storage
212///
213/// Format: [INDEX_MARKER | index_name_len | index_name | pk_len | pk | index_sk_len | index_sk]
214pub fn encode_index_key(index_name: &str, pk: &Bytes, index_sk: &Bytes) -> Vec<u8> {
215    const INDEX_MARKER: u8 = 0xFF;
216
217    let index_name_bytes = index_name.as_bytes();
218    let capacity = 1 + 4 + index_name_bytes.len() + 4 + pk.len() + 4 + index_sk.len();
219    let mut buf = Vec::with_capacity(capacity);
220
221    buf.push(INDEX_MARKER);
222    buf.extend_from_slice(&(index_name_bytes.len() as u32).to_le_bytes());
223    buf.extend_from_slice(index_name_bytes);
224    buf.extend_from_slice(&(pk.len() as u32).to_le_bytes());
225    buf.extend_from_slice(pk);
226    buf.extend_from_slice(&(index_sk.len() as u32).to_le_bytes());
227    buf.extend_from_slice(index_sk);
228
229    buf
230}
231
232/// Decode an index key
233///
234/// Returns (index_name, pk, index_sk) or None if not an index key
235pub fn decode_index_key(encoded: &[u8]) -> Option<(String, Bytes, Bytes)> {
236    const INDEX_MARKER: u8 = 0xFF;
237
238    if encoded.is_empty() || encoded[0] != INDEX_MARKER {
239        return None;
240    }
241
242    let mut pos = 1;
243
244    // Read index name
245    if encoded.len() < pos + 4 {
246        return None;
247    }
248    let name_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
249    pos += 4;
250
251    if encoded.len() < pos + name_len {
252        return None;
253    }
254    let index_name = String::from_utf8(encoded[pos..pos + name_len].to_vec()).ok()?;
255    pos += name_len;
256
257    // Read pk
258    if encoded.len() < pos + 4 {
259        return None;
260    }
261    let pk_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
262    pos += 4;
263
264    if encoded.len() < pos + pk_len {
265        return None;
266    }
267    let pk = Bytes::copy_from_slice(&encoded[pos..pos + pk_len]);
268    pos += pk_len;
269
270    // Read index_sk
271    if encoded.len() < pos + 4 {
272        return None;
273    }
274    let index_sk_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
275    pos += 4;
276
277    if encoded.len() < pos + index_sk_len {
278        return None;
279    }
280    let index_sk = Bytes::copy_from_slice(&encoded[pos..pos + index_sk_len]);
281
282    Some((index_name, pk, index_sk))
283}
284
285/// Check if an encoded key is an index key
286pub fn is_index_key(encoded: &[u8]) -> bool {
287    const INDEX_MARKER: u8 = 0xFF;
288    !encoded.is_empty() && encoded[0] == INDEX_MARKER
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_lsi_creation() {
297        let lsi = LocalSecondaryIndex::new("email-index", "email");
298        assert_eq!(lsi.name, "email-index");
299        assert_eq!(lsi.sort_key_attribute, "email");
300        assert_eq!(lsi.projection, IndexProjection::All);
301    }
302
303    #[test]
304    fn test_lsi_keys_only() {
305        let lsi = LocalSecondaryIndex::new("email-index", "email").keys_only();
306        assert_eq!(lsi.projection, IndexProjection::KeysOnly);
307    }
308
309    #[test]
310    fn test_lsi_include() {
311        let lsi = LocalSecondaryIndex::new("email-index", "email")
312            .include(vec!["name".to_string(), "age".to_string()]);
313        assert_eq!(
314            lsi.projection,
315            IndexProjection::Include(vec!["name".to_string(), "age".to_string()])
316        );
317    }
318
319    #[test]
320    fn test_table_schema() {
321        let schema = TableSchema::new()
322            .add_local_index(LocalSecondaryIndex::new("idx1", "attr1"))
323            .add_local_index(LocalSecondaryIndex::new("idx2", "attr2"));
324
325        assert_eq!(schema.local_indexes.len(), 2);
326        assert!(schema.get_local_index("idx1").is_some());
327        assert!(schema.get_local_index("idx3").is_none());
328    }
329
330    #[test]
331    fn test_encode_decode_index_key() {
332        let index_name = "email-index";
333        let pk = Bytes::from("user#123");
334        let index_sk = Bytes::from("alice@example.com");
335
336        let encoded = encode_index_key(index_name, &pk, &index_sk);
337        assert!(is_index_key(&encoded));
338
339        let (decoded_name, decoded_pk, decoded_sk) = decode_index_key(&encoded).unwrap();
340        assert_eq!(decoded_name, index_name);
341        assert_eq!(decoded_pk, pk);
342        assert_eq!(decoded_sk, index_sk);
343    }
344
345    #[test]
346    fn test_is_not_index_key() {
347        // Base table key encoding (from types.rs Key::encode)
348        let base_key = vec![0, 0, 0, 4, b'u', b's', b'e', b'r'];
349        assert!(!is_index_key(&base_key));
350    }
351
352    #[test]
353    fn test_gsi_creation() {
354        let gsi = GlobalSecondaryIndex::new("status-index", "status");
355        assert_eq!(gsi.name, "status-index");
356        assert_eq!(gsi.partition_key_attribute, "status");
357        assert_eq!(gsi.sort_key_attribute, None);
358        assert_eq!(gsi.projection, IndexProjection::All);
359    }
360
361    #[test]
362    fn test_gsi_with_sort_key() {
363        let gsi = GlobalSecondaryIndex::with_sort_key("user-index", "userId", "timestamp");
364        assert_eq!(gsi.name, "user-index");
365        assert_eq!(gsi.partition_key_attribute, "userId");
366        assert_eq!(gsi.sort_key_attribute, Some("timestamp".to_string()));
367    }
368
369    #[test]
370    fn test_gsi_keys_only() {
371        let gsi = GlobalSecondaryIndex::new("status-index", "status").keys_only();
372        assert_eq!(gsi.projection, IndexProjection::KeysOnly);
373    }
374
375    #[test]
376    fn test_table_schema_with_gsi() {
377        let schema = TableSchema::new()
378            .add_local_index(LocalSecondaryIndex::new("lsi1", "attr1"))
379            .add_global_index(GlobalSecondaryIndex::new("gsi1", "attr2"))
380            .add_global_index(GlobalSecondaryIndex::with_sort_key("gsi2", "attr3", "attr4"));
381
382        assert_eq!(schema.local_indexes.len(), 1);
383        assert_eq!(schema.global_indexes.len(), 2);
384        assert!(schema.get_global_index("gsi1").is_some());
385        assert!(schema.get_global_index("gsi3").is_none());
386    }
387
388    #[test]
389    fn test_ttl_schema() {
390        let schema = TableSchema::new().with_ttl("expiresAt");
391
392        assert_eq!(schema.ttl_attribute_name, Some("expiresAt".to_string()));
393    }
394
395    #[test]
396    fn test_ttl_expired_item() {
397        use crate::Value;
398        use std::collections::HashMap;
399
400        let schema = TableSchema::new().with_ttl("ttl");
401
402        // Item expired 100 seconds ago
403        let now = std::time::SystemTime::now()
404            .duration_since(std::time::UNIX_EPOCH)
405            .unwrap()
406            .as_secs() as i64;
407        let expired_time = now - 100;
408
409        let mut item = HashMap::new();
410        item.insert("name".to_string(), Value::string("test"));
411        item.insert("ttl".to_string(), Value::number(expired_time));
412
413        assert!(schema.is_expired(&item));
414    }
415
416    #[test]
417    fn test_ttl_not_expired_item() {
418        use crate::Value;
419        use std::collections::HashMap;
420
421        let schema = TableSchema::new().with_ttl("ttl");
422
423        // Item expires 100 seconds in the future
424        let now = std::time::SystemTime::now()
425            .duration_since(std::time::UNIX_EPOCH)
426            .unwrap()
427            .as_secs() as i64;
428        let future_time = now + 100;
429
430        let mut item = HashMap::new();
431        item.insert("name".to_string(), Value::string("test"));
432        item.insert("ttl".to_string(), Value::number(future_time));
433
434        assert!(!schema.is_expired(&item));
435    }
436
437    #[test]
438    fn test_ttl_no_ttl_attribute() {
439        use crate::Value;
440        use std::collections::HashMap;
441
442        let schema = TableSchema::new().with_ttl("ttl");
443
444        // Item has no ttl attribute
445        let mut item = HashMap::new();
446        item.insert("name".to_string(), Value::string("test"));
447
448        assert!(!schema.is_expired(&item));
449    }
450
451    #[test]
452    fn test_ttl_disabled() {
453        use crate::Value;
454        use std::collections::HashMap;
455
456        let schema = TableSchema::new(); // No TTL configured
457
458        let mut item = HashMap::new();
459        item.insert("name".to_string(), Value::string("test"));
460        item.insert("ttl".to_string(), Value::number(0)); // Ancient timestamp
461
462        assert!(!schema.is_expired(&item));
463    }
464
465    #[test]
466    fn test_ttl_with_timestamp_value() {
467        use crate::Value;
468        use std::collections::HashMap;
469
470        let schema = TableSchema::new().with_ttl("expiresAt");
471
472        // Item with Timestamp value type (milliseconds)
473        let now_millis = std::time::SystemTime::now()
474            .duration_since(std::time::UNIX_EPOCH)
475            .unwrap()
476            .as_millis() as i64;
477        let expired_millis = now_millis - 100_000; // 100 seconds ago
478
479        let mut item = HashMap::new();
480        item.insert("name".to_string(), Value::string("test"));
481        item.insert("expiresAt".to_string(), Value::Ts(expired_millis));
482
483        assert!(schema.is_expired(&item));
484    }
485}