1use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11pub enum IndexProjection {
12 All,
14 KeysOnly,
16 Include(Vec<String>),
18}
19
20impl Default for IndexProjection {
21 fn default() -> Self {
22 IndexProjection::All
23 }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub struct LocalSecondaryIndex {
32 pub name: String,
34 pub sort_key_attribute: String,
36 pub projection: IndexProjection,
38}
39
40impl LocalSecondaryIndex {
41 pub fn new(name: impl Into<String>, sort_key_attribute: impl Into<String>) -> Self {
43 Self {
44 name: name.into(),
45 sort_key_attribute: sort_key_attribute.into(),
46 projection: IndexProjection::All,
47 }
48 }
49
50 pub fn keys_only(mut self) -> Self {
52 self.projection = IndexProjection::KeysOnly;
53 self
54 }
55
56 pub fn include(mut self, attributes: Vec<String>) -> Self {
58 self.projection = IndexProjection::Include(attributes);
59 self
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct GlobalSecondaryIndex {
68 pub name: String,
70 pub partition_key_attribute: String,
72 pub sort_key_attribute: Option<String>,
74 pub projection: IndexProjection,
76}
77
78impl GlobalSecondaryIndex {
79 pub fn new(name: impl Into<String>, partition_key_attribute: impl Into<String>) -> Self {
81 Self {
82 name: name.into(),
83 partition_key_attribute: partition_key_attribute.into(),
84 sort_key_attribute: None,
85 projection: IndexProjection::All,
86 }
87 }
88
89 pub fn with_sort_key(
91 name: impl Into<String>,
92 partition_key_attribute: impl Into<String>,
93 sort_key_attribute: impl Into<String>,
94 ) -> Self {
95 Self {
96 name: name.into(),
97 partition_key_attribute: partition_key_attribute.into(),
98 sort_key_attribute: Some(sort_key_attribute.into()),
99 projection: IndexProjection::All,
100 }
101 }
102
103 pub fn keys_only(mut self) -> Self {
105 self.projection = IndexProjection::KeysOnly;
106 self
107 }
108
109 pub fn include(mut self, attributes: Vec<String>) -> Self {
111 self.projection = IndexProjection::Include(attributes);
112 self
113 }
114}
115
116#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct TableSchema {
119 pub local_indexes: Vec<LocalSecondaryIndex>,
121 pub global_indexes: Vec<GlobalSecondaryIndex>,
123 pub ttl_attribute_name: Option<String>,
126 #[serde(default)]
128 pub stream_config: crate::stream::StreamConfig,
129}
130
131impl TableSchema {
132 pub fn new() -> Self {
134 Self::default()
135 }
136
137 pub fn add_local_index(mut self, index: LocalSecondaryIndex) -> Self {
139 self.local_indexes.push(index);
140 self
141 }
142
143 pub fn get_local_index(&self, name: &str) -> Option<&LocalSecondaryIndex> {
145 self.local_indexes.iter().find(|idx| idx.name == name)
146 }
147
148 pub fn add_global_index(mut self, index: GlobalSecondaryIndex) -> Self {
150 self.global_indexes.push(index);
151 self
152 }
153
154 pub fn get_global_index(&self, name: &str) -> Option<&GlobalSecondaryIndex> {
156 self.global_indexes.iter().find(|idx| idx.name == name)
157 }
158
159 pub fn with_ttl(mut self, attribute_name: impl Into<String>) -> Self {
164 self.ttl_attribute_name = Some(attribute_name.into());
165 self
166 }
167
168 pub fn with_stream(mut self, config: crate::stream::StreamConfig) -> Self {
173 self.stream_config = config;
174 self
175 }
176
177 pub fn is_expired(&self, item: &crate::Item) -> bool {
184 use crate::Value;
185
186 if let Some(ttl_attr) = &self.ttl_attribute_name {
187 if let Some(ttl_value) = item.get(ttl_attr) {
188 let now = std::time::SystemTime::now()
190 .duration_since(std::time::UNIX_EPOCH)
191 .unwrap()
192 .as_secs() as i64;
193
194 let expires_at = match ttl_value {
196 Value::N(n) => n.parse::<i64>().ok(),
197 Value::Ts(ts) => Some(ts / 1000), _ => None,
199 };
200
201 if let Some(expires) = expires_at {
202 return now > expires;
203 }
204 }
205 }
206
207 false
208 }
209}
210
211pub fn encode_index_key(index_name: &str, pk: &Bytes, index_sk: &Bytes) -> Vec<u8> {
215 const INDEX_MARKER: u8 = 0xFF;
216
217 let index_name_bytes = index_name.as_bytes();
218 let capacity = 1 + 4 + index_name_bytes.len() + 4 + pk.len() + 4 + index_sk.len();
219 let mut buf = Vec::with_capacity(capacity);
220
221 buf.push(INDEX_MARKER);
222 buf.extend_from_slice(&(index_name_bytes.len() as u32).to_le_bytes());
223 buf.extend_from_slice(index_name_bytes);
224 buf.extend_from_slice(&(pk.len() as u32).to_le_bytes());
225 buf.extend_from_slice(pk);
226 buf.extend_from_slice(&(index_sk.len() as u32).to_le_bytes());
227 buf.extend_from_slice(index_sk);
228
229 buf
230}
231
232pub fn decode_index_key(encoded: &[u8]) -> Option<(String, Bytes, Bytes)> {
236 const INDEX_MARKER: u8 = 0xFF;
237
238 if encoded.is_empty() || encoded[0] != INDEX_MARKER {
239 return None;
240 }
241
242 let mut pos = 1;
243
244 if encoded.len() < pos + 4 {
246 return None;
247 }
248 let name_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
249 pos += 4;
250
251 if encoded.len() < pos + name_len {
252 return None;
253 }
254 let index_name = String::from_utf8(encoded[pos..pos + name_len].to_vec()).ok()?;
255 pos += name_len;
256
257 if encoded.len() < pos + 4 {
259 return None;
260 }
261 let pk_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
262 pos += 4;
263
264 if encoded.len() < pos + pk_len {
265 return None;
266 }
267 let pk = Bytes::copy_from_slice(&encoded[pos..pos + pk_len]);
268 pos += pk_len;
269
270 if encoded.len() < pos + 4 {
272 return None;
273 }
274 let index_sk_len = u32::from_le_bytes(encoded[pos..pos + 4].try_into().ok()?) as usize;
275 pos += 4;
276
277 if encoded.len() < pos + index_sk_len {
278 return None;
279 }
280 let index_sk = Bytes::copy_from_slice(&encoded[pos..pos + index_sk_len]);
281
282 Some((index_name, pk, index_sk))
283}
284
285pub fn is_index_key(encoded: &[u8]) -> bool {
287 const INDEX_MARKER: u8 = 0xFF;
288 !encoded.is_empty() && encoded[0] == INDEX_MARKER
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_lsi_creation() {
297 let lsi = LocalSecondaryIndex::new("email-index", "email");
298 assert_eq!(lsi.name, "email-index");
299 assert_eq!(lsi.sort_key_attribute, "email");
300 assert_eq!(lsi.projection, IndexProjection::All);
301 }
302
303 #[test]
304 fn test_lsi_keys_only() {
305 let lsi = LocalSecondaryIndex::new("email-index", "email").keys_only();
306 assert_eq!(lsi.projection, IndexProjection::KeysOnly);
307 }
308
309 #[test]
310 fn test_lsi_include() {
311 let lsi = LocalSecondaryIndex::new("email-index", "email")
312 .include(vec!["name".to_string(), "age".to_string()]);
313 assert_eq!(
314 lsi.projection,
315 IndexProjection::Include(vec!["name".to_string(), "age".to_string()])
316 );
317 }
318
319 #[test]
320 fn test_table_schema() {
321 let schema = TableSchema::new()
322 .add_local_index(LocalSecondaryIndex::new("idx1", "attr1"))
323 .add_local_index(LocalSecondaryIndex::new("idx2", "attr2"));
324
325 assert_eq!(schema.local_indexes.len(), 2);
326 assert!(schema.get_local_index("idx1").is_some());
327 assert!(schema.get_local_index("idx3").is_none());
328 }
329
330 #[test]
331 fn test_encode_decode_index_key() {
332 let index_name = "email-index";
333 let pk = Bytes::from("user#123");
334 let index_sk = Bytes::from("alice@example.com");
335
336 let encoded = encode_index_key(index_name, &pk, &index_sk);
337 assert!(is_index_key(&encoded));
338
339 let (decoded_name, decoded_pk, decoded_sk) = decode_index_key(&encoded).unwrap();
340 assert_eq!(decoded_name, index_name);
341 assert_eq!(decoded_pk, pk);
342 assert_eq!(decoded_sk, index_sk);
343 }
344
345 #[test]
346 fn test_is_not_index_key() {
347 let base_key = vec![0, 0, 0, 4, b'u', b's', b'e', b'r'];
349 assert!(!is_index_key(&base_key));
350 }
351
352 #[test]
353 fn test_gsi_creation() {
354 let gsi = GlobalSecondaryIndex::new("status-index", "status");
355 assert_eq!(gsi.name, "status-index");
356 assert_eq!(gsi.partition_key_attribute, "status");
357 assert_eq!(gsi.sort_key_attribute, None);
358 assert_eq!(gsi.projection, IndexProjection::All);
359 }
360
361 #[test]
362 fn test_gsi_with_sort_key() {
363 let gsi = GlobalSecondaryIndex::with_sort_key("user-index", "userId", "timestamp");
364 assert_eq!(gsi.name, "user-index");
365 assert_eq!(gsi.partition_key_attribute, "userId");
366 assert_eq!(gsi.sort_key_attribute, Some("timestamp".to_string()));
367 }
368
369 #[test]
370 fn test_gsi_keys_only() {
371 let gsi = GlobalSecondaryIndex::new("status-index", "status").keys_only();
372 assert_eq!(gsi.projection, IndexProjection::KeysOnly);
373 }
374
375 #[test]
376 fn test_table_schema_with_gsi() {
377 let schema = TableSchema::new()
378 .add_local_index(LocalSecondaryIndex::new("lsi1", "attr1"))
379 .add_global_index(GlobalSecondaryIndex::new("gsi1", "attr2"))
380 .add_global_index(GlobalSecondaryIndex::with_sort_key("gsi2", "attr3", "attr4"));
381
382 assert_eq!(schema.local_indexes.len(), 1);
383 assert_eq!(schema.global_indexes.len(), 2);
384 assert!(schema.get_global_index("gsi1").is_some());
385 assert!(schema.get_global_index("gsi3").is_none());
386 }
387
388 #[test]
389 fn test_ttl_schema() {
390 let schema = TableSchema::new().with_ttl("expiresAt");
391
392 assert_eq!(schema.ttl_attribute_name, Some("expiresAt".to_string()));
393 }
394
395 #[test]
396 fn test_ttl_expired_item() {
397 use crate::Value;
398 use std::collections::HashMap;
399
400 let schema = TableSchema::new().with_ttl("ttl");
401
402 let now = std::time::SystemTime::now()
404 .duration_since(std::time::UNIX_EPOCH)
405 .unwrap()
406 .as_secs() as i64;
407 let expired_time = now - 100;
408
409 let mut item = HashMap::new();
410 item.insert("name".to_string(), Value::string("test"));
411 item.insert("ttl".to_string(), Value::number(expired_time));
412
413 assert!(schema.is_expired(&item));
414 }
415
416 #[test]
417 fn test_ttl_not_expired_item() {
418 use crate::Value;
419 use std::collections::HashMap;
420
421 let schema = TableSchema::new().with_ttl("ttl");
422
423 let now = std::time::SystemTime::now()
425 .duration_since(std::time::UNIX_EPOCH)
426 .unwrap()
427 .as_secs() as i64;
428 let future_time = now + 100;
429
430 let mut item = HashMap::new();
431 item.insert("name".to_string(), Value::string("test"));
432 item.insert("ttl".to_string(), Value::number(future_time));
433
434 assert!(!schema.is_expired(&item));
435 }
436
437 #[test]
438 fn test_ttl_no_ttl_attribute() {
439 use crate::Value;
440 use std::collections::HashMap;
441
442 let schema = TableSchema::new().with_ttl("ttl");
443
444 let mut item = HashMap::new();
446 item.insert("name".to_string(), Value::string("test"));
447
448 assert!(!schema.is_expired(&item));
449 }
450
451 #[test]
452 fn test_ttl_disabled() {
453 use crate::Value;
454 use std::collections::HashMap;
455
456 let schema = TableSchema::new(); let mut item = HashMap::new();
459 item.insert("name".to_string(), Value::string("test"));
460 item.insert("ttl".to_string(), Value::number(0)); assert!(!schema.is_expired(&item));
463 }
464
465 #[test]
466 fn test_ttl_with_timestamp_value() {
467 use crate::Value;
468 use std::collections::HashMap;
469
470 let schema = TableSchema::new().with_ttl("expiresAt");
471
472 let now_millis = std::time::SystemTime::now()
474 .duration_since(std::time::UNIX_EPOCH)
475 .unwrap()
476 .as_millis() as i64;
477 let expired_millis = now_millis - 100_000; let mut item = HashMap::new();
480 item.insert("name".to_string(), Value::string("test"));
481 item.insert("expiresAt".to_string(), Value::Ts(expired_millis));
482
483 assert!(schema.is_expired(&item));
484 }
485}