1use crate::errors::{DynoxideError, Result};
2use crate::types::AttributeValue;
3use rusqlite::{Connection, params};
4use std::cell::RefCell;
5use std::collections::HashMap;
6
7const SCHEMA_VERSION: &str = "6";
9
10const HASH_BUCKETS: u32 = 4096;
13
14pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
28 let key_bytes = match pk_value {
29 AttributeValue::S(s) => s.as_bytes().to_vec(),
30 AttributeValue::N(n) => num_to_buffer(n),
31 AttributeValue::B(b) => b.clone(),
32 _ => vec![], };
34
35 let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
36 format!("{:032x}", digest)[..6].to_string()
37}
38
39pub fn hash_bucket(hash_prefix: &str) -> u32 {
41 let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
42 u32::from_str_radix(prefix_3, 16).unwrap_or(0)
43}
44
45fn num_to_buffer(num_str: &str) -> Vec<u8> {
51 let trimmed = num_str.trim();
52 if trimmed.is_empty() {
53 return vec![0x80];
54 }
55
56 use bigdecimal::BigDecimal;
57 use std::str::FromStr;
58
59 let bd = match BigDecimal::from_str(trimmed) {
60 Ok(v) => v,
61 Err(_) => return vec![0x80],
62 };
63
64 if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
65 return vec![0x80];
66 }
67
68 let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
69 let bd_abs = if is_negative { -&bd } else { bd.clone() };
70
71 let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
72 if mantissa.is_empty() {
73 return vec![0x80];
74 }
75
76 let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
79 let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
80
81 let mut byte_array: Vec<u8>;
82 if byte_len_no_exp < 20 && is_negative {
83 byte_array = vec![0u8; byte_len_no_exp + 2];
84 byte_array[byte_len_no_exp + 1] = 102;
85 } else {
86 byte_array = vec![0u8; byte_len_no_exp + 1];
87 }
88
89 let exp_sum = exponent + append_zero;
93 let exp_byte_val = floor_div(exp_sum, 2) - 64;
94 if is_negative {
95 byte_array[0] = (exp_byte_val ^ !0i64) as u8;
98 } else {
99 byte_array[0] = exp_byte_val as u8;
100 }
101
102 let mut mi: i64 = 0; let mlen = mantissa.len() as i64;
106 let mut appended_zero = false;
107
108 while mi < mlen {
109 let bai = ((mi + append_zero) / 2 + 1) as usize; if append_zero != 0 && mi == 0 && !appended_zero {
111 byte_array[bai] = 0;
112 appended_zero = true;
113 mi -= 1; } else if (mi + append_zero) % 2 == 0 {
115 byte_array[bai] = mantissa[mi as usize] * 10;
116 } else {
117 byte_array[bai] += mantissa[mi as usize];
118 }
119
120 if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
122 if is_negative {
123 byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
124 } else {
125 byte_array[bai] = byte_array[bai].wrapping_add(1);
126 }
127 }
128
129 mi += 1; }
131
132 byte_array
133}
134
135fn floor_div(a: i64, b: i64) -> i64 {
137 let d = a / b;
138 let r = a % b;
139 if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
140}
141
142fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
149 let normalized = bd.normalized();
151
152 let (bigint, scale) = normalized.as_bigint_and_exponent();
155 let digits_str = bigint.to_string();
156 let digits_str = digits_str.trim_start_matches('-');
157
158 let digits: Vec<u8> = digits_str
159 .chars()
160 .map(|c| c.to_digit(10).unwrap() as u8)
161 .collect();
162
163 let exponent = digits.len() as i64 - scale;
166
167 (digits, exponent)
168}
169
170pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
177 let bucket = hash_bucket(hash_prefix);
178 let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
179 let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
180 bucket >= start && bucket <= end
181}
182
183fn ceiling_div(a: u32, b: u32) -> u32 {
184 a.div_ceil(b)
185}
186
187#[derive(Debug, Default)]
189pub struct ScanParams<'a> {
190 pub limit: Option<usize>,
191 pub exclusive_start_pk: Option<&'a str>,
192 pub exclusive_start_sk: Option<&'a str>,
193 pub segment: Option<u32>,
194 pub total_segments: Option<u32>,
195 pub exclusive_start_base_pk: Option<&'a str>,
197 pub exclusive_start_base_sk: Option<&'a str>,
199}
200
201#[derive(Debug, Default)]
203pub struct CreateTableMetadata<'a> {
204 pub table_name: &'a str,
205 pub key_schema: &'a str,
206 pub attribute_definitions: &'a str,
207 pub gsi_definitions: Option<&'a str>,
208 pub lsi_definitions: Option<&'a str>,
209 pub provisioned_throughput: Option<&'a str>,
210 pub created_at: i64,
211 pub sse_specification: Option<&'a str>,
212 pub table_class: Option<&'a str>,
213 pub deletion_protection_enabled: bool,
214 pub billing_mode: Option<&'a str>,
215}
216
217#[derive(Debug, Default)]
219pub struct QueryParams<'a> {
220 pub sk_condition: Option<&'a str>,
221 pub sk_params: &'a [&'a str],
222 pub forward: bool,
223 pub limit: Option<usize>,
224 pub exclusive_start_sk: Option<&'a str>,
225 pub exclusive_start_base_pk: Option<&'a str>,
227 pub exclusive_start_base_sk: Option<&'a str>,
229}
230
231pub struct Storage {
236 conn: Connection,
237 metadata_cache: RefCell<HashMap<String, TableMetadata>>,
240}
241
242impl Storage {
243 pub fn new(path: &str) -> Result<Self> {
245 let conn = Connection::open(path)?;
246 let mut storage = Self {
247 conn,
248 metadata_cache: RefCell::new(HashMap::new()),
249 };
250 storage.initialize().map_err(Self::maybe_encrypted_error)?;
251 Ok(storage)
252 }
253
254 fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
257 if let DynoxideError::SqliteError(ref sqlite_err) = err {
258 if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
259 return DynoxideError::InternalServerError(
260 "Database file is encrypted or not a valid SQLite database. \
261 If encrypted, enable the `encryption` or `encryption-cc` feature \
262 and use Database::new_encrypted() with the correct key."
263 .to_string(),
264 );
265 }
266 }
267 err
268 }
269
270 #[cfg(feature = "_has-encryption")]
276 pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
277 use zeroize::Zeroize;
278
279 let conn = Connection::open(path)?;
280 let mut pragma_val = format!("x'{key}'");
285 conn.pragma_update(None, "key", &pragma_val)?;
286 pragma_val.zeroize();
287 conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
289 let mut storage = Self {
290 conn,
291 metadata_cache: RefCell::new(HashMap::new()),
292 };
293 storage.initialize()?;
294 Ok(storage)
295 }
296
297 pub fn memory() -> Result<Self> {
299 let conn = Connection::open_in_memory()?;
300 let mut storage = Self {
301 conn,
302 metadata_cache: RefCell::new(HashMap::new()),
303 };
304 storage.initialize()?;
305 Ok(storage)
306 }
307
308 fn initialize(&mut self) -> Result<()> {
310 self.conn.pragma_update(None, "journal_mode", "WAL")?;
312
313 self.conn.create_scalar_function(
316 "fnv1a_hash",
317 1,
318 rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
319 | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
320 |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
321 let pk_ref = ctx.get_raw(0);
322 let pk_bytes = match pk_ref {
323 rusqlite::types::ValueRef::Text(bytes) => bytes,
324 _ => {
325 return Err(rusqlite::Error::InvalidFunctionParameterType(
326 0,
327 rusqlite::types::Type::Text,
328 ));
329 }
330 };
331 let mut hash: u32 = 2166136261;
332 for &byte in pk_bytes {
333 hash ^= byte as u32;
334 hash = hash.wrapping_mul(16777619);
335 }
336 Ok(hash as i64)
337 },
338 )?;
339
340 self.conn.execute_batch(
342 "CREATE TABLE IF NOT EXISTS _config (
343 key TEXT PRIMARY KEY,
344 value TEXT NOT NULL
345 );
346
347 CREATE TABLE IF NOT EXISTS _tables (
348 table_name TEXT PRIMARY KEY,
349 key_schema TEXT NOT NULL,
350 attribute_definitions TEXT NOT NULL,
351 gsi_definitions TEXT,
352 lsi_definitions TEXT,
353 stream_enabled INTEGER DEFAULT 0,
354 stream_view_type TEXT,
355 stream_label TEXT,
356 ttl_attribute TEXT,
357 ttl_enabled INTEGER DEFAULT 0,
358 created_at INTEGER NOT NULL,
359 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
360 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
361 provisioned_throughput TEXT,
362 tags TEXT,
363 sse_specification TEXT,
364 table_class TEXT,
365 deletion_protection_enabled INTEGER DEFAULT 0
366 );
367
368 CREATE TABLE IF NOT EXISTS _stream_records (
369 id INTEGER PRIMARY KEY AUTOINCREMENT,
370 table_name TEXT NOT NULL,
371 event_name TEXT NOT NULL,
372 keys_json TEXT NOT NULL,
373 new_image TEXT,
374 old_image TEXT,
375 sequence_number TEXT NOT NULL,
376 shard_id TEXT NOT NULL,
377 created_at INTEGER NOT NULL,
378 user_identity TEXT
379 );",
380 )?;
381
382 let _ = self
384 .conn
385 .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
386
387 self.conn.execute(
389 "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
390 params![SCHEMA_VERSION],
391 )?;
392
393 let version: i32 = self
395 .conn
396 .query_row(
397 "SELECT value FROM _config WHERE key = 'schema_version'",
398 [],
399 |r| r.get::<_, String>(0),
400 )
401 .unwrap_or_else(|_| "1".to_string())
402 .parse()
403 .unwrap_or(1);
404
405 if version < 2 {
406 self.migrate_v1_to_v2()?;
407 }
408 if version < 3 {
409 self.migrate_v2_to_v3()?;
410 }
411 if version < 4 {
412 self.migrate_v3_to_v4()?;
413 }
414 if version < 5 {
415 self.migrate_v4_to_v5()?;
416 }
417 if version < 6 {
418 self.migrate_v5_to_v6()?;
419 }
420
421 Ok(())
422 }
423
424 fn migrate_v1_to_v2(&self) -> Result<()> {
426 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427 let table_names: Vec<String> = stmt
428 .query_map([], |row| row.get(0))?
429 .collect::<std::result::Result<Vec<_>, _>>()?;
430
431 for table_name in &table_names {
432 let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433 let _ = self.conn.execute(
434 &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435 [],
436 );
437 }
438
439 self.conn.execute(
440 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441 [],
442 )?;
443
444 Ok(())
445 }
446
447 fn migrate_v2_to_v3(&self) -> Result<()> {
449 let _ = self
450 .conn
451 .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453 self.conn.execute(
454 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455 [],
456 )?;
457
458 Ok(())
459 }
460
461 fn migrate_v3_to_v4(&self) -> Result<()> {
463 let _ = self
464 .conn
465 .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466 let _ = self
467 .conn
468 .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469 let _ = self.conn.execute(
470 "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471 [],
472 );
473
474 self.conn.execute(
475 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476 [],
477 )?;
478
479 Ok(())
480 }
481
482 fn migrate_v4_to_v5(&self) -> Result<()> {
484 let mut stmt = self
485 .conn
486 .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487 let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489 .collect::<std::result::Result<Vec<_>, _>>()?;
490
491 for (table_name, gsi_json, lsi_json) in &tables {
492 if let Some(json) = gsi_json {
493 if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494 for gsi in &gsis {
495 if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496 let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497 let idx_name =
498 escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499 let _ = self.conn.execute_batch(&format!(
500 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501 ));
502 }
503 }
504 }
505 }
506 if let Some(json) = lsi_json {
507 if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508 for lsi in &lsis {
509 if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510 let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511 let idx_name =
512 escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513 let _ = self.conn.execute_batch(&format!(
514 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515 ));
516 }
517 }
518 }
519 }
520 }
521
522 self.conn.execute(
523 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524 [],
525 )?;
526
527 Ok(())
528 }
529
530 fn migrate_v5_to_v6(&self) -> Result<()> {
532 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533 let table_names: Vec<String> = stmt
534 .query_map([], |row| row.get(0))?
535 .collect::<std::result::Result<Vec<_>, _>>()?;
536
537 for table_name in &table_names {
538 let escaped = escape_table_name(table_name);
539 let _ = self.conn.execute(
540 &format!(
541 "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542 ),
543 [],
544 );
545 }
546
547 self.conn.execute(
548 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549 [],
550 )?;
551
552 Ok(())
553 }
554
555 pub fn conn(&self) -> &Connection {
557 &self.conn
558 }
559
560 pub fn conn_mut(&mut self) -> &mut Connection {
562 &mut self.conn
563 }
564
565 pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
571 let table_name = m.table_name;
572 self.conn.execute(
573 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, gsi_definitions, \
574 lsi_definitions, provisioned_throughput, created_at, sse_specification, table_class, \
575 deletion_protection_enabled, billing_mode)
576 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
577 params![
578 m.table_name,
579 m.key_schema,
580 m.attribute_definitions,
581 m.gsi_definitions,
582 m.lsi_definitions,
583 m.provisioned_throughput,
584 m.created_at,
585 m.sse_specification,
586 m.table_class,
587 m.deletion_protection_enabled as i32,
588 m.billing_mode,
589 ],
590 )?;
591 self.metadata_cache.borrow_mut().remove(table_name);
592 Ok(())
593 }
594
595 pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
601 if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
603 return Ok(Some(cached.clone()));
604 }
605
606 let sql = format!("SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE table_name = ?1");
607 let mut stmt = self.conn.prepare(&sql)?;
608
609 let result = stmt.query_row(params![table_name], row_to_metadata);
610
611 match result {
612 Ok(meta) => {
613 self.metadata_cache
614 .borrow_mut()
615 .insert(table_name.to_string(), meta.clone());
616 Ok(Some(meta))
617 }
618 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
619 Err(e) => Err(DynoxideError::from(e)),
620 }
621 }
622
623 pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
625 let affected = self.conn.execute(
626 "DELETE FROM _tables WHERE table_name = ?1",
627 params![table_name],
628 )?;
629 self.metadata_cache.borrow_mut().remove(table_name);
630 Ok(affected > 0)
631 }
632
633 pub fn update_table_metadata(
635 &self,
636 table_name: &str,
637 attribute_definitions: &str,
638 gsi_definitions: Option<&str>,
639 ) -> Result<()> {
640 self.conn.execute(
641 "UPDATE _tables SET attribute_definitions = ?1, gsi_definitions = ?2 WHERE table_name = ?3",
642 params![attribute_definitions, gsi_definitions, table_name],
643 )?;
644 self.metadata_cache.borrow_mut().remove(table_name);
645 Ok(())
646 }
647
648 pub fn update_provisioned_throughput(
650 &self,
651 table_name: &str,
652 provisioned_throughput: &str,
653 ) -> Result<()> {
654 self.conn.execute(
655 "UPDATE _tables SET provisioned_throughput = ?1 WHERE table_name = ?2",
656 params![provisioned_throughput, table_name],
657 )?;
658 self.metadata_cache.borrow_mut().remove(table_name);
659 Ok(())
660 }
661
662 pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
664 self.conn.execute(
665 "UPDATE _tables SET provisioned_throughput = NULL WHERE table_name = ?1",
666 params![table_name],
667 )?;
668 self.metadata_cache.borrow_mut().remove(table_name);
669 Ok(())
670 }
671
672 pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
674 self.conn.execute(
675 "UPDATE _tables SET billing_mode = ?1 WHERE table_name = ?2",
676 params![billing_mode, table_name],
677 )?;
678 self.metadata_cache.borrow_mut().remove(table_name);
679 Ok(())
680 }
681
682 pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
688 let tags_json: Option<String> = self.conn.query_row(
689 "SELECT tags FROM _tables WHERE table_name = ?1",
690 params![table_name],
691 |row| row.get(0),
692 )?;
693
694 match tags_json {
695 Some(json) => serde_json::from_str(&json)
696 .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
697 None => Ok(Vec::new()),
698 }
699 }
700
701 pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
703 use std::collections::BTreeMap;
704
705 let existing = self.get_tags(table_name)?;
706 let mut tag_map: BTreeMap<String, String> =
707 existing.into_iter().map(|t| (t.key, t.value)).collect();
708
709 for tag in new_tags {
710 tag_map.insert(tag.key.clone(), tag.value.clone());
711 }
712
713 if tag_map.len() > 50 {
714 return Err(DynoxideError::ValidationException(
715 "One or more parameter values were invalid: \
716 Too many tags: tag limit is 50"
717 .to_string(),
718 ));
719 }
720
721 let merged: Vec<crate::types::Tag> = tag_map
722 .into_iter()
723 .map(|(k, v)| crate::types::Tag { key: k, value: v })
724 .collect();
725
726 let json = serde_json::to_string(&merged)
727 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
728
729 self.conn.execute(
730 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
731 params![json, table_name],
732 )?;
733 Ok(())
734 }
735
736 pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
738 self.conn.execute(
739 "UPDATE _tables SET deletion_protection_enabled = ?1 WHERE table_name = ?2",
740 params![enabled as i32, table_name],
741 )?;
742 self.metadata_cache.borrow_mut().remove(table_name);
743 Ok(())
744 }
745
746 pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
748 let mut tags = self.get_tags(table_name)?;
749 tags.retain(|t| !keys.contains(&t.key));
750
751 let json = if tags.is_empty() {
752 None
753 } else {
754 Some(
755 serde_json::to_string(&tags)
756 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
757 )
758 };
759
760 self.conn.execute(
761 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
762 params![json, table_name],
763 )?;
764 Ok(())
765 }
766
767 pub fn list_table_names(&self) -> Result<Vec<String>> {
769 let mut stmt = self
770 .conn
771 .prepare("SELECT table_name FROM _tables ORDER BY table_name")?;
772 let names = stmt
773 .query_map([], |row| row.get(0))?
774 .collect::<std::result::Result<Vec<String>, _>>()?;
775 Ok(names)
776 }
777
778 pub fn table_exists(&self, table_name: &str) -> Result<bool> {
780 let count: i32 = self.conn.query_row(
781 "SELECT COUNT(*) FROM _tables WHERE table_name = ?1",
782 params![table_name],
783 |row| row.get(0),
784 )?;
785 Ok(count > 0)
786 }
787
788 #[allow(dead_code)]
790 pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
791 self.metadata_cache.borrow_mut().remove(table_name);
792 }
793
794 pub fn create_data_table(&self, table_name: &str) -> Result<()> {
800 let sql = format!(
801 "CREATE TABLE \"{}\" (
802 pk TEXT NOT NULL,
803 sk TEXT NOT NULL DEFAULT '',
804 item_json TEXT NOT NULL,
805 item_size INTEGER NOT NULL,
806 cached_at REAL,
807 hash_prefix TEXT NOT NULL DEFAULT '',
808 PRIMARY KEY (pk, sk)
809 )",
810 escape_table_name(table_name)
811 );
812 self.conn.execute(&sql, [])?;
813 Ok(())
814 }
815
816 pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
818 let sql = format!("DROP TABLE IF EXISTS \"{}\"", escape_table_name(table_name));
819 self.conn.execute(&sql, [])?;
820 Ok(())
821 }
822
823 pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
825 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
826 let escaped = escape_table_name(&gsi_table_name);
827 let sql = format!(
828 "CREATE TABLE \"{escaped}\" (
829 gsi_pk TEXT NOT NULL,
830 gsi_sk TEXT NOT NULL DEFAULT '',
831 table_pk TEXT NOT NULL,
832 table_sk TEXT NOT NULL DEFAULT '',
833 item_json TEXT NOT NULL,
834 PRIMARY KEY (gsi_pk, gsi_sk, table_pk, table_sk)
835 )"
836 );
837 self.conn.execute(&sql, [])?;
838
839 let idx_name = escape_table_name(&format!("{gsi_table_name}::base_key"));
840 self.conn.execute_batch(&format!(
841 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (table_pk, table_sk)"
842 ))?;
843 Ok(())
844 }
845
846 pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
848 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
849 let sql = format!(
850 "DROP TABLE IF EXISTS \"{}\"",
851 escape_table_name(&gsi_table_name)
852 );
853 self.conn.execute(&sql, [])?;
854 Ok(())
855 }
856
857 #[allow(clippy::too_many_arguments)]
863 pub fn insert_gsi_item(
864 &self,
865 table_name: &str,
866 index_name: &str,
867 gsi_pk: &str,
868 gsi_sk: &str,
869 table_pk: &str,
870 table_sk: &str,
871 item_json: &str,
872 ) -> Result<()> {
873 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
874 let sql = format!(
875 "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
876 escape_table_name(&gsi_table_name)
877 );
878 self.conn
879 .prepare_cached(&sql)?
880 .execute(params![gsi_pk, gsi_sk, table_pk, table_sk, item_json])?;
881 Ok(())
882 }
883
884 pub fn delete_gsi_item(
886 &self,
887 table_name: &str,
888 index_name: &str,
889 table_pk: &str,
890 table_sk: &str,
891 ) -> Result<()> {
892 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
893 let sql = format!(
894 "DELETE FROM \"{}\" WHERE table_pk = ?1 AND table_sk = ?2",
895 escape_table_name(&gsi_table_name)
896 );
897 self.conn
898 .prepare_cached(&sql)?
899 .execute(params![table_pk, table_sk])?;
900 Ok(())
901 }
902
903 pub fn query_gsi_items(
905 &self,
906 table_name: &str,
907 index_name: &str,
908 gsi_pk: &str,
909 params: &QueryParams,
910 ) -> Result<Vec<(String, String, String)>> {
911 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
912 let mut sql = format!(
913 "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\" WHERE gsi_pk = ?1",
914 escape_table_name(&gsi_table_name)
915 );
916
917 let mut param_idx = 2;
918 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> =
919 vec![Box::new(gsi_pk.to_string())];
920
921 if let Some(cond) = params.sk_condition {
922 let gsi_cond = cond.replace(" sk ", " gsi_sk ").replace(" sk>", " gsi_sk>");
929 sql.push(' ');
930 sql.push_str(&gsi_cond);
931 for &p in params.sk_params {
932 all_params.push(Box::new(p.to_string()));
933 param_idx += 1;
934 }
935 }
936
937 if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
940 params.exclusive_start_sk,
941 params.exclusive_start_base_pk,
942 params.exclusive_start_base_sk,
943 ) {
944 let op = if params.forward { ">" } else { "<" };
945 sql.push_str(&format!(
946 " AND (gsi_sk, table_pk, table_sk) {op} (?{}, ?{}, ?{})",
947 param_idx,
948 param_idx + 1,
949 param_idx + 2
950 ));
951 all_params.push(Box::new(start_sk.to_string()));
952 all_params.push(Box::new(start_base_pk.to_string()));
953 all_params.push(Box::new(start_base_sk.to_string()));
954 } else if let Some(start_sk) = params.exclusive_start_sk {
955 if params.forward {
956 sql.push_str(&format!(" AND gsi_sk > ?{param_idx}"));
957 } else {
958 sql.push_str(&format!(" AND gsi_sk < ?{param_idx}"));
959 }
960 all_params.push(Box::new(start_sk.to_string()));
961 }
962
963 sql.push_str(if params.forward {
964 " ORDER BY gsi_sk ASC, table_pk ASC, table_sk ASC"
965 } else {
966 " ORDER BY gsi_sk DESC, table_pk DESC, table_sk DESC"
967 });
968
969 if let Some(lim) = params.limit {
970 sql.push_str(&format!(" LIMIT {lim}"));
971 }
972
973 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
974 all_params.iter().map(|p| p.as_ref()).collect();
975 let mut stmt = self.conn.prepare(&sql)?;
976 let rows = stmt
977 .query_map(param_refs.as_slice(), |row| {
978 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
979 })?
980 .collect::<std::result::Result<Vec<_>, _>>()?;
981
982 Ok(rows)
983 }
984
985 pub fn scan_gsi_items(
987 &self,
988 table_name: &str,
989 index_name: &str,
990 params: &ScanParams,
991 ) -> Result<Vec<(String, String, String)>> {
992 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
993 let mut sql = format!(
994 "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\"",
995 escape_table_name(&gsi_table_name)
996 );
997
998 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
999 let mut where_clauses = Vec::new();
1000 let mut param_idx = 1;
1001
1002 if let (Some(start_pk), Some(start_sk)) =
1003 (params.exclusive_start_pk, params.exclusive_start_sk)
1004 {
1005 where_clauses.push(format!(
1006 "(gsi_pk, gsi_sk) > (?{}, ?{})",
1007 param_idx,
1008 param_idx + 1
1009 ));
1010 params_vec.push(Box::new(start_pk.to_string()));
1011 params_vec.push(Box::new(start_sk.to_string()));
1012 param_idx += 2;
1013 }
1014
1015 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1017 where_clauses.push(format!(
1018 "(fnv1a_hash(table_pk) % ?{}) = ?{}",
1019 param_idx,
1020 param_idx + 1
1021 ));
1022 params_vec.push(Box::new(total as i64));
1023 params_vec.push(Box::new(seg as i64));
1024 }
1025
1026 if !where_clauses.is_empty() {
1027 sql.push_str(" WHERE ");
1028 sql.push_str(&where_clauses.join(" AND "));
1029 }
1030
1031 sql.push_str(" ORDER BY gsi_pk ASC, gsi_sk ASC");
1032
1033 if let Some(lim) = params.limit {
1034 sql.push_str(&format!(" LIMIT {lim}"));
1035 }
1036
1037 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1038 params_vec.iter().map(|p| p.as_ref()).collect();
1039 let mut stmt = self.conn.prepare(&sql)?;
1040 let rows = stmt
1041 .query_map(param_refs.as_slice(), |row| {
1042 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1043 })?
1044 .collect::<std::result::Result<Vec<_>, _>>()?;
1045
1046 Ok(rows)
1047 }
1048
1049 pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1055 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1056 let escaped = escape_table_name(&lsi_table_name);
1057 let sql = format!(
1058 "CREATE TABLE \"{escaped}\" (
1059 pk TEXT NOT NULL,
1060 sk TEXT NOT NULL DEFAULT '',
1061 base_pk TEXT NOT NULL,
1062 base_sk TEXT NOT NULL DEFAULT '',
1063 item_json TEXT NOT NULL,
1064 PRIMARY KEY (pk, sk, base_pk, base_sk)
1065 )"
1066 );
1067 self.conn.execute(&sql, [])?;
1068
1069 let idx_name = escape_table_name(&format!("{lsi_table_name}::base_key"));
1070 self.conn.execute_batch(&format!(
1071 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (base_pk, base_sk)"
1072 ))?;
1073 Ok(())
1074 }
1075
1076 pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1078 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1079 let sql = format!(
1080 "DROP TABLE IF EXISTS \"{}\"",
1081 escape_table_name(&lsi_table_name)
1082 );
1083 self.conn.execute(&sql, [])?;
1084 Ok(())
1085 }
1086
1087 #[allow(clippy::too_many_arguments)]
1093 pub fn insert_lsi_item(
1094 &self,
1095 table_name: &str,
1096 index_name: &str,
1097 pk: &str,
1098 sk: &str,
1099 base_pk: &str,
1100 base_sk: &str,
1101 item_json: &str,
1102 ) -> Result<()> {
1103 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1104 let sql = format!(
1105 "INSERT OR REPLACE INTO \"{}\" (pk, sk, base_pk, base_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
1106 escape_table_name(&lsi_table_name)
1107 );
1108 self.conn
1109 .prepare_cached(&sql)?
1110 .execute(params![pk, sk, base_pk, base_sk, item_json])?;
1111 Ok(())
1112 }
1113
1114 pub fn delete_lsi_item(
1116 &self,
1117 table_name: &str,
1118 index_name: &str,
1119 base_pk: &str,
1120 base_sk: &str,
1121 ) -> Result<()> {
1122 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1123 let sql = format!(
1124 "DELETE FROM \"{}\" WHERE base_pk = ?1 AND base_sk = ?2",
1125 escape_table_name(&lsi_table_name)
1126 );
1127 self.conn
1128 .prepare_cached(&sql)?
1129 .execute(params![base_pk, base_sk])?;
1130 Ok(())
1131 }
1132
1133 pub fn query_lsi_items(
1135 &self,
1136 table_name: &str,
1137 index_name: &str,
1138 pk: &str,
1139 params: &QueryParams,
1140 ) -> Result<Vec<(String, String, String)>> {
1141 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1142 let mut sql = format!(
1143 "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1144 escape_table_name(&lsi_table_name)
1145 );
1146
1147 let mut param_idx = 2;
1148 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1149
1150 if let Some(cond) = params.sk_condition {
1151 sql.push(' ');
1152 sql.push_str(cond);
1153 for &p in params.sk_params {
1154 all_params.push(Box::new(p.to_string()));
1155 param_idx += 1;
1156 }
1157 }
1158
1159 if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1162 params.exclusive_start_sk,
1163 params.exclusive_start_base_pk,
1164 params.exclusive_start_base_sk,
1165 ) {
1166 let op = if params.forward { ">" } else { "<" };
1167 sql.push_str(&format!(
1168 " AND (sk, base_pk, base_sk) {op} (?{}, ?{}, ?{})",
1169 param_idx,
1170 param_idx + 1,
1171 param_idx + 2
1172 ));
1173 all_params.push(Box::new(start_sk.to_string()));
1174 all_params.push(Box::new(start_base_pk.to_string()));
1175 all_params.push(Box::new(start_base_sk.to_string()));
1176 } else if let Some(start_sk) = params.exclusive_start_sk {
1177 if params.forward {
1178 sql.push_str(&format!(" AND sk > ?{param_idx}"));
1179 } else {
1180 sql.push_str(&format!(" AND sk < ?{param_idx}"));
1181 }
1182 all_params.push(Box::new(start_sk.to_string()));
1183 }
1184
1185 sql.push_str(if params.forward {
1186 " ORDER BY sk ASC, base_pk ASC, base_sk ASC"
1187 } else {
1188 " ORDER BY sk DESC, base_pk DESC, base_sk DESC"
1189 });
1190
1191 if let Some(lim) = params.limit {
1192 sql.push_str(&format!(" LIMIT {lim}"));
1193 }
1194
1195 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1196 all_params.iter().map(|p| p.as_ref()).collect();
1197 let mut stmt = self.conn.prepare(&sql)?;
1198 let rows = stmt
1199 .query_map(param_refs.as_slice(), |row| {
1200 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1201 })?
1202 .collect::<std::result::Result<Vec<_>, _>>()?;
1203
1204 Ok(rows)
1205 }
1206
1207 pub fn scan_lsi_items(
1209 &self,
1210 table_name: &str,
1211 index_name: &str,
1212 params: &ScanParams,
1213 ) -> Result<Vec<(String, String, String)>> {
1214 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1215 let mut sql = format!(
1216 "SELECT pk, sk, item_json FROM \"{}\"",
1217 escape_table_name(&lsi_table_name)
1218 );
1219
1220 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1221 let mut where_clauses = Vec::new();
1222 let mut param_idx = 1;
1223
1224 if let (Some(start_pk), Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1226 params.exclusive_start_pk,
1227 params.exclusive_start_sk,
1228 params.exclusive_start_base_pk,
1229 params.exclusive_start_base_sk,
1230 ) {
1231 where_clauses.push(format!(
1232 "(pk, sk, base_pk, base_sk) > (?{}, ?{}, ?{}, ?{})",
1233 param_idx,
1234 param_idx + 1,
1235 param_idx + 2,
1236 param_idx + 3
1237 ));
1238 params_vec.push(Box::new(start_pk.to_string()));
1239 params_vec.push(Box::new(start_sk.to_string()));
1240 params_vec.push(Box::new(start_base_pk.to_string()));
1241 params_vec.push(Box::new(start_base_sk.to_string()));
1242 param_idx += 4;
1243 } else if let (Some(start_pk), Some(start_sk)) =
1244 (params.exclusive_start_pk, params.exclusive_start_sk)
1245 {
1246 where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1247 params_vec.push(Box::new(start_pk.to_string()));
1248 params_vec.push(Box::new(start_sk.to_string()));
1249 param_idx += 2;
1250 }
1251
1252 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1254 where_clauses.push(format!(
1255 "(fnv1a_hash(base_pk) % ?{}) = ?{}",
1256 param_idx,
1257 param_idx + 1
1258 ));
1259 params_vec.push(Box::new(total as i64));
1260 params_vec.push(Box::new(seg as i64));
1261 }
1262
1263 if !where_clauses.is_empty() {
1264 sql.push_str(" WHERE ");
1265 sql.push_str(&where_clauses.join(" AND "));
1266 }
1267
1268 sql.push_str(" ORDER BY pk ASC, sk ASC");
1269
1270 if let Some(lim) = params.limit {
1271 sql.push_str(&format!(" LIMIT {lim}"));
1272 }
1273
1274 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1275 params_vec.iter().map(|p| p.as_ref()).collect();
1276 let mut stmt = self.conn.prepare(&sql)?;
1277 let rows = stmt
1278 .query_map(param_refs.as_slice(), |row| {
1279 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1280 })?
1281 .collect::<std::result::Result<Vec<_>, _>>()?;
1282
1283 Ok(rows)
1284 }
1285
1286 pub fn begin_transaction(&self) -> Result<()> {
1292 self.conn.execute_batch("BEGIN IMMEDIATE")?;
1293 Ok(())
1294 }
1295
1296 pub fn commit(&self) -> Result<()> {
1298 self.conn.execute_batch("COMMIT")?;
1299 Ok(())
1300 }
1301
1302 pub fn rollback(&self) -> Result<()> {
1304 self.conn.execute_batch("ROLLBACK")?;
1305 Ok(())
1306 }
1307
1308 pub fn enable_bulk_loading(&self) -> Result<()> {
1314 self.conn.execute_batch(
1315 "PRAGMA synchronous = OFF;
1316 PRAGMA cache_size = -64000;
1317 PRAGMA temp_store = MEMORY;
1318 PRAGMA mmap_size = 268435456;",
1319 )?;
1320 Ok(())
1321 }
1322
1323 pub fn disable_bulk_loading(&self) -> Result<()> {
1325 self.conn.execute_batch(
1326 "PRAGMA synchronous = NORMAL;
1327 PRAGMA cache_size = -2000;
1328 PRAGMA temp_store = DEFAULT;
1329 PRAGMA mmap_size = 0;",
1330 )?;
1331 Ok(())
1332 }
1333
1334 pub fn put_item(
1340 &self,
1341 table_name: &str,
1342 pk: &str,
1343 sk: &str,
1344 item_json: &str,
1345 item_size: usize,
1346 ) -> Result<Option<String>> {
1347 self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1348 }
1349
1350 pub fn put_item_with_hash(
1352 &self,
1353 table_name: &str,
1354 pk: &str,
1355 sk: &str,
1356 item_json: &str,
1357 item_size: usize,
1358 hash_prefix: &str,
1359 ) -> Result<Option<String>> {
1360 let old_item = self.get_item(table_name, pk, sk)?;
1362
1363 let escaped = escape_table_name(table_name);
1364 let sql = format!(
1365 "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1366 VALUES (?1, ?2, ?3, ?4, \
1367 (SELECT cached_at FROM \"{escaped}\" WHERE pk = ?1 AND sk = ?2), ?5)"
1368 );
1369 self.conn.execute(
1370 &sql,
1371 params![pk, sk, item_json, item_size as i64, hash_prefix],
1372 )?;
1373
1374 Ok(old_item)
1375 }
1376
1377 pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1379 let sql = format!(
1380 "SELECT item_json FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1381 escape_table_name(table_name)
1382 );
1383 let result = self.conn.query_row(&sql, params![pk, sk], |row| row.get(0));
1384
1385 match result {
1386 Ok(json) => Ok(Some(json)),
1387 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1388 Err(e) => Err(DynoxideError::from(e)),
1389 }
1390 }
1391
1392 pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1394 let sql = format!(
1395 "SELECT COALESCE(SUM(item_size), 0) FROM \"{}\" WHERE pk = ?1",
1396 escape_table_name(table_name)
1397 );
1398 let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1399 Ok(size)
1400 }
1401
1402 pub fn get_lsi_partition_size(
1405 &self,
1406 table_name: &str,
1407 index_name: &str,
1408 pk: &str,
1409 ) -> Result<i64> {
1410 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1411 let sql = format!(
1412 "SELECT COALESCE(SUM(length(item_json)), 0) FROM \"{}\" WHERE pk = ?1",
1413 escape_table_name(&lsi_table_name)
1414 );
1415 let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1416 Ok(size)
1417 }
1418
1419 pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1421 let old_item = self.get_item(table_name, pk, sk)?;
1422
1423 let sql = format!(
1424 "DELETE FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1425 escape_table_name(table_name)
1426 );
1427 self.conn.execute(&sql, params![pk, sk])?;
1428
1429 Ok(old_item)
1430 }
1431
1432 pub fn query_items(
1437 &self,
1438 table_name: &str,
1439 pk: &str,
1440 params: &QueryParams,
1441 ) -> Result<Vec<(String, String, String)>> {
1442 let mut sql = format!(
1443 "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1444 escape_table_name(table_name)
1445 );
1446
1447 let mut param_idx = 2;
1448 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1449
1450 if let Some(cond) = params.sk_condition {
1451 sql.push(' ');
1452 sql.push_str(cond);
1453 for &p in params.sk_params {
1454 all_params.push(Box::new(p.to_string()));
1455 param_idx += 1;
1456 }
1457 }
1458
1459 if let Some(start_sk) = params.exclusive_start_sk {
1460 if params.forward {
1461 sql.push_str(&format!(" AND sk > ?{param_idx}"));
1462 } else {
1463 sql.push_str(&format!(" AND sk < ?{param_idx}"));
1464 }
1465 all_params.push(Box::new(start_sk.to_string()));
1466 }
1467
1468 sql.push_str(if params.forward {
1469 " ORDER BY sk ASC"
1470 } else {
1471 " ORDER BY sk DESC"
1472 });
1473
1474 if let Some(lim) = params.limit {
1475 sql.push_str(&format!(" LIMIT {lim}"));
1476 }
1477
1478 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1479 all_params.iter().map(|p| p.as_ref()).collect();
1480 let mut stmt = self.conn.prepare(&sql)?;
1481 let rows = stmt
1482 .query_map(param_refs.as_slice(), |row| {
1483 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1484 })?
1485 .collect::<std::result::Result<Vec<_>, _>>()?;
1486
1487 Ok(rows)
1488 }
1489
1490 pub fn scan_items(
1495 &self,
1496 table_name: &str,
1497 params: &ScanParams,
1498 ) -> Result<Vec<(String, String, String)>> {
1499 let mut sql = format!(
1500 "SELECT pk, sk, item_json FROM \"{}\"",
1501 escape_table_name(table_name)
1502 );
1503
1504 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1505 let mut where_clauses = Vec::new();
1506 let mut param_idx = 1;
1507
1508 let is_parallel = params.segment.is_some() && params.total_segments.is_some();
1510
1511 if let (Some(start_pk), Some(start_sk)) =
1512 (params.exclusive_start_pk, params.exclusive_start_sk)
1513 {
1514 if is_parallel {
1515 where_clauses.push(format!(
1517 "(hash_prefix, pk, sk) > ((SELECT hash_prefix FROM \"{}\" WHERE pk = ?{} AND sk = ?{} LIMIT 1), ?{}, ?{})",
1518 escape_table_name(table_name),
1519 param_idx, param_idx + 1,
1520 param_idx, param_idx + 1
1521 ));
1522 } else {
1523 where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1524 }
1525 params_vec.push(Box::new(start_pk.to_string()));
1526 params_vec.push(Box::new(start_sk.to_string()));
1527 param_idx += 2;
1528 }
1529
1530 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1531 let start_bucket = ceiling_div(HASH_BUCKETS * seg, total);
1535 let end_bucket = ceiling_div(HASH_BUCKETS * (seg + 1), total) - 1;
1536 let start_hex = format!("{:03x}", start_bucket);
1537 let end_hex = format!("{:03x}", end_bucket);
1538 where_clauses.push(format!(
1540 "substr(hash_prefix, 1, 3) >= ?{} AND substr(hash_prefix, 1, 3) <= ?{}",
1541 param_idx,
1542 param_idx + 1
1543 ));
1544 params_vec.push(Box::new(start_hex));
1545 params_vec.push(Box::new(end_hex));
1546 }
1547
1548 if !where_clauses.is_empty() {
1549 sql.push_str(" WHERE ");
1550 sql.push_str(&where_clauses.join(" AND "));
1551 }
1552
1553 if is_parallel {
1556 sql.push_str(" ORDER BY hash_prefix ASC, pk ASC, sk ASC");
1557 } else {
1558 sql.push_str(" ORDER BY pk ASC, sk ASC");
1559 }
1560
1561 if let Some(lim) = params.limit {
1562 sql.push_str(&format!(" LIMIT {lim}"));
1563 }
1564
1565 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1566 params_vec.iter().map(|p| p.as_ref()).collect();
1567 let mut stmt = self.conn.prepare(&sql)?;
1568 let rows = stmt
1569 .query_map(param_refs.as_slice(), |row| {
1570 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1571 })?
1572 .collect::<std::result::Result<Vec<_>, _>>()?;
1573
1574 Ok(rows)
1575 }
1576
1577 pub fn count_items(&self, table_name: &str) -> Result<i64> {
1579 let sql = format!("SELECT COUNT(*) FROM \"{}\"", escape_table_name(table_name));
1580 let count: i64 = self.conn.query_row(&sql, [], |row| row.get(0))?;
1581 Ok(count)
1582 }
1583
1584 pub fn db_path(&self) -> Option<String> {
1590 self.conn
1591 .path()
1592 .filter(|p| !p.is_empty())
1593 .map(|p| p.to_owned())
1594 }
1595
1596 pub fn db_size_bytes(&self) -> Result<u64> {
1598 let size: i64 = self.conn.query_row(
1599 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1600 [],
1601 |row| row.get(0),
1602 )?;
1603 Ok(size as u64)
1604 }
1605
1606 pub fn table_count(&self) -> Result<usize> {
1608 let count: i64 = self
1609 .conn
1610 .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1611 Ok(count as usize)
1612 }
1613
1614 pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1618 let table_names = self.list_table_names()?;
1619 let mut stats = Vec::with_capacity(table_names.len());
1620 for name in table_names {
1621 let sql = format!(
1622 "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1623 escape_table_name(&name)
1624 );
1625 let (item_count, size_bytes): (i64, i64) = self
1626 .conn
1627 .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1628 stats.push(TableStats {
1629 table_name: name,
1630 item_count,
1631 size_bytes: size_bytes as u64,
1632 });
1633 }
1634 Ok(stats)
1635 }
1636
1637 pub fn database_info(&self) -> Result<DatabaseInfo> {
1642 let path = self.db_path();
1643 let size_bytes = self.db_size_bytes()?;
1644 let table_count = self.table_count()?;
1645 let stats = self.table_stats()?;
1646
1647 let mut table_details = Vec::with_capacity(stats.len());
1648 for s in stats {
1649 let metadata = self.get_table_metadata(&s.table_name)?;
1650 table_details.push(TableInfoEntry { stats: s, metadata });
1651 }
1652
1653 Ok(DatabaseInfo {
1654 path,
1655 size_bytes,
1656 table_count,
1657 tables: table_details,
1658 })
1659 }
1660
1661 pub fn vacuum_into(&self, path: &str) -> Result<()> {
1668 if path.contains('\0') {
1669 return Err(DynoxideError::ValidationException(
1670 "path contains null byte".to_string(),
1671 ));
1672 }
1673 self.conn
1674 .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1675 Ok(())
1676 }
1677
1678 pub fn vacuum(&self) -> Result<()> {
1680 self.conn.execute_batch("VACUUM")?;
1681 Ok(())
1682 }
1683
1684 pub fn restore_from(&mut self, path: &str) -> Result<()> {
1688 let source = Connection::open(path)?;
1689 self.restore_from_connection(&source)
1690 }
1691
1692 pub fn backup_to_memory(&self) -> Result<Connection> {
1697 let mut dest = Connection::open_in_memory()?;
1698 {
1699 let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1700 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1701 }
1702 Ok(dest)
1703 }
1704
1705 pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1710 let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1711 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1712 self.metadata_cache.borrow_mut().clear();
1713 Ok(())
1714 }
1715
1716 pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1718 let size: i64 = conn.query_row(
1719 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1720 [],
1721 |row| row.get(0),
1722 )?;
1723 Ok(size as u64)
1724 }
1725
1726 pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1732 self.conn.execute(
1733 "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1734 params![view_type, label, table_name],
1735 )?;
1736 self.metadata_cache.borrow_mut().remove(table_name);
1737 Ok(())
1738 }
1739
1740 pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1742 self.conn.execute(
1743 "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1744 params![table_name],
1745 )?;
1746 self.metadata_cache.borrow_mut().remove(table_name);
1747 Ok(())
1748 }
1749
1750 #[allow(clippy::too_many_arguments)]
1752 pub fn insert_stream_record(
1753 &self,
1754 table_name: &str,
1755 event_name: &str,
1756 keys_json: &str,
1757 new_image: Option<&str>,
1758 old_image: Option<&str>,
1759 sequence_number: &str,
1760 shard_id: &str,
1761 created_at: i64,
1762 ) -> Result<()> {
1763 self.insert_stream_record_with_identity(
1764 table_name,
1765 event_name,
1766 keys_json,
1767 new_image,
1768 old_image,
1769 sequence_number,
1770 shard_id,
1771 created_at,
1772 None,
1773 )
1774 }
1775
1776 #[allow(clippy::too_many_arguments)]
1778 pub fn insert_stream_record_with_identity(
1779 &self,
1780 table_name: &str,
1781 event_name: &str,
1782 keys_json: &str,
1783 new_image: Option<&str>,
1784 old_image: Option<&str>,
1785 sequence_number: &str,
1786 shard_id: &str,
1787 created_at: i64,
1788 user_identity: Option<&str>,
1789 ) -> Result<()> {
1790 self.conn.execute(
1791 "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1792 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1793 params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1794 )?;
1795 Ok(())
1796 }
1797
1798 pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1800 let result: std::result::Result<i64, _> = self.conn.query_row(
1801 "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1802 params![table_name],
1803 |row| row.get(0),
1804 );
1805 match result {
1806 Ok(n) => Ok(n),
1807 Err(_) => Ok(1),
1808 }
1809 }
1810
1811 pub fn get_stream_records(
1813 &self,
1814 table_name: &str,
1815 shard_id: &str,
1816 after_sequence: i64,
1817 limit: usize,
1818 ) -> Result<Vec<StreamRecord>> {
1819 let mut stmt = self.conn.prepare(
1820 "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1821 FROM _stream_records
1822 WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1823 ORDER BY CAST(sequence_number AS INTEGER) ASC
1824 LIMIT ?4",
1825 )?;
1826 let rows = stmt
1827 .query_map(
1828 params![table_name, shard_id, after_sequence, limit as i64],
1829 |row| {
1830 Ok(StreamRecord {
1831 event_name: row.get(0)?,
1832 keys_json: row.get(1)?,
1833 new_image: row.get(2)?,
1834 old_image: row.get(3)?,
1835 sequence_number: row.get(4)?,
1836 created_at: row.get(5)?,
1837 user_identity: row.get(6)?,
1838 })
1839 },
1840 )?
1841 .collect::<std::result::Result<Vec<_>, _>>()?;
1842 Ok(rows)
1843 }
1844
1845 pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1847 let sql = format!(
1848 "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name"
1849 );
1850 let mut stmt = self.conn.prepare(&sql)?;
1851 let rows = stmt
1852 .query_map([], row_to_metadata)?
1853 .collect::<std::result::Result<Vec<_>, _>>()?;
1854 Ok(rows)
1855 }
1856
1857 pub fn update_ttl_config(
1863 &self,
1864 table_name: &str,
1865 attribute_name: Option<&str>,
1866 enabled: bool,
1867 ) -> Result<()> {
1868 self.conn.execute(
1869 "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1870 params![attribute_name, enabled as i32, table_name],
1871 )?;
1872 self.metadata_cache.borrow_mut().remove(table_name);
1873 Ok(())
1874 }
1875
1876 pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1878 let sql = format!(
1879 "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name"
1880 );
1881 let mut stmt = self.conn.prepare(&sql)?;
1882 let rows = stmt
1883 .query_map([], row_to_metadata)?
1884 .collect::<std::result::Result<Vec<_>, _>>()?;
1885 Ok(rows)
1886 }
1887
1888 pub fn get_shard_sequence_range(
1890 &self,
1891 table_name: &str,
1892 shard_id: &str,
1893 ) -> Result<(Option<String>, Option<String>)> {
1894 let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1895 "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1896 params![table_name, shard_id],
1897 |row| Ok((row.get(0)?, row.get(1)?)),
1898 );
1899 match result {
1900 Ok(range) => Ok(range),
1901 Err(_) => Ok((None, None)),
1902 }
1903 }
1904
1905 pub fn touch_cached_at(
1911 &self,
1912 table_name: &str,
1913 pk: &str,
1914 sk: &str,
1915 timestamp: f64,
1916 ) -> Result<()> {
1917 let sql = format!(
1918 "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1919 escape_table_name(table_name)
1920 );
1921 self.conn.execute(&sql, params![timestamp, pk, sk])?;
1922 Ok(())
1923 }
1924
1925 pub fn get_lru_items(
1930 &self,
1931 table_name: &str,
1932 limit: usize,
1933 ) -> Result<Vec<(String, String, i64)>> {
1934 let sql = format!(
1935 "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1936 escape_table_name(table_name)
1937 );
1938 let mut stmt = self.conn.prepare(&sql)?;
1939 let rows = stmt
1940 .query_map(params![limit as i64], |row| {
1941 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1942 })?
1943 .collect::<std::result::Result<Vec<_>, _>>()?;
1944 Ok(rows)
1945 }
1946}
1947
1948#[derive(Debug, Clone)]
1950pub struct StreamRecord {
1951 pub event_name: String,
1952 pub keys_json: String,
1953 pub new_image: Option<String>,
1954 pub old_image: Option<String>,
1955 pub sequence_number: String,
1956 pub created_at: i64,
1957 pub user_identity: Option<String>,
1958}
1959
1960#[derive(Debug, Clone)]
1962pub struct TableStats {
1963 pub table_name: String,
1964 pub item_count: i64,
1965 pub size_bytes: u64,
1966}
1967
1968#[derive(Debug, Clone)]
1970pub struct DatabaseInfo {
1971 pub path: Option<String>,
1972 pub size_bytes: u64,
1973 pub table_count: usize,
1974 pub tables: Vec<TableInfoEntry>,
1975}
1976
1977#[derive(Debug, Clone)]
1979pub struct TableInfoEntry {
1980 pub stats: TableStats,
1981 pub metadata: Option<TableMetadata>,
1982}
1983
1984pub(crate) fn escape_table_name(name: &str) -> String {
1986 name.replace('"', "\"\"")
1987}
1988
1989#[derive(Debug, Clone)]
1995pub struct TableMetadata {
1996 pub table_name: String,
1997 pub key_schema: String,
1998 pub attribute_definitions: String,
1999 pub gsi_definitions: Option<String>,
2000 pub lsi_definitions: Option<String>,
2001 pub stream_enabled: bool,
2002 pub stream_view_type: Option<String>,
2003 pub stream_label: Option<String>,
2004 pub ttl_attribute: Option<String>,
2005 pub ttl_enabled: bool,
2006 pub created_at: i64,
2007 pub table_status: String,
2008 pub billing_mode: Option<String>,
2009 pub provisioned_throughput: Option<String>,
2010 pub sse_specification: Option<String>,
2011 pub table_class: Option<String>,
2012 pub deletion_protection_enabled: bool,
2013}
2014
2015const TABLE_METADATA_COLUMNS: &str = "table_name, key_schema, attribute_definitions, gsi_definitions, \
2017 lsi_definitions, stream_enabled, stream_view_type, stream_label, ttl_attribute, ttl_enabled, \
2018 created_at, table_status, billing_mode, provisioned_throughput, \
2019 sse_specification, table_class, deletion_protection_enabled";
2020
2021fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
2023 Ok(TableMetadata {
2024 table_name: row.get(0)?,
2025 key_schema: row.get(1)?,
2026 attribute_definitions: row.get(2)?,
2027 gsi_definitions: row.get(3)?,
2028 lsi_definitions: row.get(4)?,
2029 stream_enabled: row.get::<_, i32>(5)? != 0,
2030 stream_view_type: row.get(6)?,
2031 stream_label: row.get(7)?,
2032 ttl_attribute: row.get(8)?,
2033 ttl_enabled: row.get::<_, i32>(9)? != 0,
2034 created_at: row.get(10)?,
2035 table_status: row.get(11)?,
2036 billing_mode: row.get(12)?,
2037 provisioned_throughput: row.get(13)?,
2038 sse_specification: row.get(14)?,
2039 table_class: row.get(15)?,
2040 deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
2041 })
2042}
2043
2044#[cfg(test)]
2045mod tests {
2046 use super::*;
2047
2048 fn test_storage() -> Storage {
2049 Storage::memory().expect("Failed to create in-memory storage")
2050 }
2051
2052 #[test]
2053 fn test_initialize_creates_metadata_tables() {
2054 let storage = test_storage();
2055 let version: String = storage
2057 .conn()
2058 .query_row(
2059 "SELECT value FROM _config WHERE key = 'schema_version'",
2060 [],
2061 |row| row.get(0),
2062 )
2063 .unwrap();
2064 assert_eq!(version, SCHEMA_VERSION);
2065 }
2066
2067 #[test]
2068 fn test_wal_mode_enabled() {
2069 let storage = test_storage();
2070 let mode: String = storage
2071 .conn()
2072 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
2073 .unwrap();
2074 assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
2076 }
2077
2078 #[test]
2079 fn test_table_metadata_crud() {
2080 let storage = test_storage();
2081
2082 assert!(!storage.table_exists("TestTable").unwrap());
2084 assert!(storage.list_table_names().unwrap().is_empty());
2085
2086 storage
2088 .insert_table_metadata(&CreateTableMetadata {
2089 table_name: "TestTable",
2090 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2091 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2092 created_at: 1000000,
2093 ..Default::default()
2094 })
2095 .unwrap();
2096
2097 assert!(storage.table_exists("TestTable").unwrap());
2098 assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2099
2100 let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2102 assert_eq!(meta.table_name, "TestTable");
2103 assert_eq!(meta.table_status, "ACTIVE");
2104 assert_eq!(meta.created_at, 1000000);
2105
2106 assert!(storage.delete_table_metadata("TestTable").unwrap());
2108 assert!(!storage.table_exists("TestTable").unwrap());
2109 }
2110
2111 #[test]
2112 fn test_create_and_drop_data_table() {
2113 let storage = test_storage();
2114 storage.create_data_table("MyTable").unwrap();
2115
2116 storage
2118 .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2119 .unwrap();
2120
2121 let item = storage.get_item("MyTable", "pk1", "").unwrap();
2122 assert!(item.is_some());
2123
2124 storage.drop_data_table("MyTable").unwrap();
2125 }
2126
2127 #[test]
2128 fn test_item_crud() {
2129 let storage = test_storage();
2130 storage.create_data_table("Items").unwrap();
2131
2132 let old = storage
2134 .put_item(
2135 "Items",
2136 "user#1",
2137 "profile",
2138 r#"{"name":{"S":"Alice"}}"#,
2139 20,
2140 )
2141 .unwrap();
2142 assert!(old.is_none()); let item = storage.get_item("Items", "user#1", "profile").unwrap();
2146 assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2147
2148 let old = storage
2150 .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2151 .unwrap();
2152 assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2153
2154 let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2156 assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2157
2158 assert!(
2160 storage
2161 .get_item("Items", "user#1", "profile")
2162 .unwrap()
2163 .is_none()
2164 );
2165 }
2166
2167 #[test]
2168 fn test_query_items() {
2169 let storage = test_storage();
2170 storage.create_data_table("Orders").unwrap();
2171
2172 for i in 1..=5 {
2174 let sk = format!("order#{i:03}");
2175 let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2176 storage
2177 .put_item("Orders", "user#1", &sk, &json, 10)
2178 .unwrap();
2179 }
2180
2181 let results = storage
2183 .query_items(
2184 "Orders",
2185 "user#1",
2186 &QueryParams {
2187 forward: true,
2188 ..Default::default()
2189 },
2190 )
2191 .unwrap();
2192 assert_eq!(results.len(), 5);
2193 assert_eq!(results[0].1, "order#001"); let results = storage
2197 .query_items(
2198 "Orders",
2199 "user#1",
2200 &QueryParams {
2201 forward: true,
2202 limit: Some(2),
2203 ..Default::default()
2204 },
2205 )
2206 .unwrap();
2207 assert_eq!(results.len(), 2);
2208
2209 let results = storage
2211 .query_items(
2212 "Orders",
2213 "user#1",
2214 &QueryParams {
2215 forward: false,
2216 limit: Some(2),
2217 ..Default::default()
2218 },
2219 )
2220 .unwrap();
2221 assert_eq!(results.len(), 2);
2222 assert_eq!(results[0].1, "order#005"); }
2224
2225 #[test]
2226 fn test_scan_items() {
2227 let storage = test_storage();
2228 storage.create_data_table("ScanTest").unwrap();
2229
2230 storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2231 storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2232 storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2233
2234 let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2235 assert_eq!(results.len(), 3);
2236
2237 let results = storage
2239 .scan_items(
2240 "ScanTest",
2241 &ScanParams {
2242 limit: Some(2),
2243 ..Default::default()
2244 },
2245 )
2246 .unwrap();
2247 assert_eq!(results.len(), 2);
2248
2249 let results = storage
2251 .scan_items(
2252 "ScanTest",
2253 &ScanParams {
2254 limit: Some(2),
2255 exclusive_start_pk: Some("a"),
2256 exclusive_start_sk: Some("1"),
2257 ..Default::default()
2258 },
2259 )
2260 .unwrap();
2261 assert_eq!(results.len(), 2);
2262 assert_eq!(results[0].0, "b"); }
2264
2265 #[test]
2266 fn test_count_items() {
2267 let storage = test_storage();
2268 storage.create_data_table("CountTest").unwrap();
2269
2270 assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2271
2272 storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2273 storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2274
2275 assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2276 }
2277
2278 #[test]
2279 fn test_gsi_table_lifecycle() {
2280 let storage = test_storage();
2281 storage.create_gsi_table("Orders", "ByDate").unwrap();
2282
2283 let gsi_name = "Orders::gsi::ByDate";
2285 let sql = format!(
2286 "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2287 gsi_name.replace('"', "\"\"")
2288 );
2289 storage
2290 .conn()
2291 .execute(
2292 &sql,
2293 params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2294 )
2295 .unwrap();
2296
2297 storage.drop_gsi_table("Orders", "ByDate").unwrap();
2298 }
2299
2300 #[test]
2301 fn test_nonexistent_table_metadata() {
2302 let storage = test_storage();
2303 assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2304 assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2305 }
2306
2307 #[test]
2308 fn test_metadata_cache_hit() {
2309 let storage = test_storage();
2310 storage
2311 .insert_table_metadata(&CreateTableMetadata {
2312 table_name: "CacheTest",
2313 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2314 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2315 created_at: 1000000,
2316 ..Default::default()
2317 })
2318 .unwrap();
2319
2320 let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2322 assert_eq!(meta1.table_name, "CacheTest");
2323
2324 let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2326 assert_eq!(meta2.table_name, "CacheTest");
2327 assert_eq!(meta1.created_at, meta2.created_at);
2328
2329 assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2331 }
2332
2333 #[test]
2334 fn test_metadata_cache_invalidated_on_delete() {
2335 let storage = test_storage();
2336 storage
2337 .insert_table_metadata(&CreateTableMetadata {
2338 table_name: "DelCache",
2339 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2340 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2341 created_at: 1000000,
2342 ..Default::default()
2343 })
2344 .unwrap();
2345
2346 storage.get_table_metadata("DelCache").unwrap();
2348 assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2349
2350 storage.delete_table_metadata("DelCache").unwrap();
2352 assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2353 }
2354
2355 #[test]
2356 fn test_metadata_cache_invalidated_on_stream_enable() {
2357 let storage = test_storage();
2358 storage
2359 .insert_table_metadata(&CreateTableMetadata {
2360 table_name: "StreamCache",
2361 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2362 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2363 created_at: 1000000,
2364 ..Default::default()
2365 })
2366 .unwrap();
2367
2368 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2370 assert!(!meta.stream_enabled);
2371
2372 storage
2374 .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2375 .unwrap();
2376 assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2377
2378 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2380 assert!(meta.stream_enabled);
2381 }
2382
2383 #[test]
2384 fn test_metadata_cache_invalidated_on_ttl_update() {
2385 let storage = test_storage();
2386 storage
2387 .insert_table_metadata(&CreateTableMetadata {
2388 table_name: "TtlCache",
2389 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2390 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2391 created_at: 1000000,
2392 ..Default::default()
2393 })
2394 .unwrap();
2395
2396 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2398 assert!(!meta.ttl_enabled);
2399
2400 storage
2402 .update_ttl_config("TtlCache", Some("expires_at"), true)
2403 .unwrap();
2404 assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2405
2406 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2408 assert!(meta.ttl_enabled);
2409 assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2410 }
2411
2412 #[test]
2413 fn test_num_to_buffer_zero() {
2414 assert_eq!(num_to_buffer("0"), vec![0x80]);
2416 assert_eq!(num_to_buffer("-0"), vec![0x80]);
2417 }
2418
2419 #[test]
2420 fn test_hash_prefix_string_keys() {
2421 let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2424 let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2425 let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2426 let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2427
2428 assert_eq!(
2431 hash_bucket(&h1),
2432 0,
2433 "3635 should be bucket 0, got hash {h1}"
2434 );
2435 assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2436
2437 assert_eq!(
2439 hash_bucket(&h3),
2440 1,
2441 "1668 should be bucket 1, got hash {h3}"
2442 );
2443
2444 assert_eq!(
2446 hash_bucket(&h4),
2447 4,
2448 "3435 should be bucket 4, got hash {h4}"
2449 );
2450 }
2451
2452 #[test]
2453 fn test_hash_prefix_number_keys() {
2454 let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2457 assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2458
2459 let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2461 assert_eq!(
2462 hash_bucket(&h2),
2463 4095,
2464 "2388 should be bucket 4095, got hash {h2}"
2465 );
2466 }
2467
2468 #[test]
2469 fn test_hash_in_segment() {
2470 assert!(hash_in_segment("000000", 0, 4096));
2472 assert!(!hash_in_segment("000000", 1, 4096));
2473
2474 assert!(hash_in_segment("001000", 1, 4096));
2476 assert!(!hash_in_segment("001000", 0, 4096));
2477
2478 assert!(hash_in_segment("fff000", 4095, 4096));
2480 assert!(!hash_in_segment("fff000", 0, 4096));
2481
2482 assert!(hash_in_segment("000000", 0, 2));
2484 assert!(hash_in_segment("7ff000", 0, 2));
2485 assert!(hash_in_segment("800000", 1, 2));
2486 assert!(hash_in_segment("fff000", 1, 2));
2487 }
2488}