1use crate::errors::{DynoxideError, Result};
2use crate::types::AttributeValue;
3use rusqlite::{Connection, params};
4use std::cell::RefCell;
5use std::collections::HashMap;
6
7const SCHEMA_VERSION: &str = "6";
9
10const HASH_BUCKETS: u32 = 4096;
13
14pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
28 let key_bytes = match pk_value {
29 AttributeValue::S(s) => s.as_bytes().to_vec(),
30 AttributeValue::N(n) => num_to_buffer(n),
31 AttributeValue::B(b) => b.clone(),
32 _ => vec![], };
34
35 let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
36 format!("{:032x}", digest)[..6].to_string()
37}
38
39pub fn hash_bucket(hash_prefix: &str) -> u32 {
41 let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
42 u32::from_str_radix(prefix_3, 16).unwrap_or(0)
43}
44
45fn num_to_buffer(num_str: &str) -> Vec<u8> {
51 let trimmed = num_str.trim();
52 if trimmed.is_empty() {
53 return vec![0x80];
54 }
55
56 use bigdecimal::BigDecimal;
57 use std::str::FromStr;
58
59 let bd = match BigDecimal::from_str(trimmed) {
60 Ok(v) => v,
61 Err(_) => return vec![0x80],
62 };
63
64 if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
65 return vec![0x80];
66 }
67
68 let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
69 let bd_abs = if is_negative { -&bd } else { bd.clone() };
70
71 let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
72 if mantissa.is_empty() {
73 return vec![0x80];
74 }
75
76 let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
79 let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
80
81 let mut byte_array: Vec<u8>;
82 if byte_len_no_exp < 20 && is_negative {
83 byte_array = vec![0u8; byte_len_no_exp + 2];
84 byte_array[byte_len_no_exp + 1] = 102;
85 } else {
86 byte_array = vec![0u8; byte_len_no_exp + 1];
87 }
88
89 let exp_sum = exponent + append_zero;
93 let exp_byte_val = floor_div(exp_sum, 2) - 64;
94 if is_negative {
95 byte_array[0] = (exp_byte_val ^ !0i64) as u8;
98 } else {
99 byte_array[0] = exp_byte_val as u8;
100 }
101
102 let mut mi: i64 = 0; let mlen = mantissa.len() as i64;
106 let mut appended_zero = false;
107
108 while mi < mlen {
109 let bai = ((mi + append_zero) / 2 + 1) as usize; if append_zero != 0 && mi == 0 && !appended_zero {
111 byte_array[bai] = 0;
112 appended_zero = true;
113 mi -= 1; } else if (mi + append_zero) % 2 == 0 {
115 byte_array[bai] = mantissa[mi as usize] * 10;
116 } else {
117 byte_array[bai] += mantissa[mi as usize];
118 }
119
120 if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
122 if is_negative {
123 byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
124 } else {
125 byte_array[bai] = byte_array[bai].wrapping_add(1);
126 }
127 }
128
129 mi += 1; }
131
132 byte_array
133}
134
135fn floor_div(a: i64, b: i64) -> i64 {
137 let d = a / b;
138 let r = a % b;
139 if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
140}
141
142fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
149 let normalized = bd.normalized();
151
152 let (bigint, scale) = normalized.as_bigint_and_exponent();
155 let digits_str = bigint.to_string();
156 let digits_str = digits_str.trim_start_matches('-');
157
158 let digits: Vec<u8> = digits_str
159 .chars()
160 .map(|c| c.to_digit(10).unwrap() as u8)
161 .collect();
162
163 let exponent = digits.len() as i64 - scale;
166
167 (digits, exponent)
168}
169
170pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
177 let bucket = hash_bucket(hash_prefix);
178 let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
179 let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
180 bucket >= start && bucket <= end
181}
182
183fn ceiling_div(a: u32, b: u32) -> u32 {
184 a.div_ceil(b)
185}
186
187#[derive(Debug, Default)]
189pub struct ScanParams<'a> {
190 pub limit: Option<usize>,
191 pub exclusive_start_pk: Option<&'a str>,
192 pub exclusive_start_sk: Option<&'a str>,
193 pub segment: Option<u32>,
194 pub total_segments: Option<u32>,
195 pub exclusive_start_base_pk: Option<&'a str>,
197 pub exclusive_start_base_sk: Option<&'a str>,
199}
200
201#[derive(Debug, Default)]
203pub struct CreateTableMetadata<'a> {
204 pub table_name: &'a str,
205 pub key_schema: &'a str,
206 pub attribute_definitions: &'a str,
207 pub gsi_definitions: Option<&'a str>,
208 pub lsi_definitions: Option<&'a str>,
209 pub provisioned_throughput: Option<&'a str>,
210 pub created_at: i64,
211 pub sse_specification: Option<&'a str>,
212 pub table_class: Option<&'a str>,
213 pub deletion_protection_enabled: bool,
214 pub billing_mode: Option<&'a str>,
215}
216
217#[derive(Debug, Default)]
219pub struct QueryParams<'a> {
220 pub sk_condition: Option<&'a str>,
221 pub sk_params: &'a [&'a str],
222 pub forward: bool,
223 pub limit: Option<usize>,
224 pub exclusive_start_sk: Option<&'a str>,
225 pub exclusive_start_base_pk: Option<&'a str>,
227 pub exclusive_start_base_sk: Option<&'a str>,
229}
230
231pub struct Storage {
236 conn: Connection,
237 metadata_cache: RefCell<HashMap<String, TableMetadata>>,
240}
241
242impl Storage {
243 pub fn new(path: &str) -> Result<Self> {
245 let conn = Connection::open(path)?;
246 let mut storage = Self {
247 conn,
248 metadata_cache: RefCell::new(HashMap::new()),
249 };
250 storage.initialize().map_err(Self::maybe_encrypted_error)?;
251 Ok(storage)
252 }
253
254 fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
257 if let DynoxideError::SqliteError(ref sqlite_err) = err {
258 if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
259 return DynoxideError::InternalServerError(
260 "Database file is encrypted or not a valid SQLite database. \
261 If encrypted, enable the `encryption` or `encryption-cc` feature \
262 and use Database::new_encrypted() with the correct key."
263 .to_string(),
264 );
265 }
266 }
267 err
268 }
269
270 #[cfg(feature = "_has-encryption")]
276 pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
277 use zeroize::Zeroize;
278
279 let conn = Connection::open(path)?;
280 let mut pragma_val = format!("x'{key}'");
285 conn.pragma_update(None, "key", &pragma_val)?;
286 pragma_val.zeroize();
287 conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
289 let mut storage = Self {
290 conn,
291 metadata_cache: RefCell::new(HashMap::new()),
292 };
293 storage.initialize()?;
294 Ok(storage)
295 }
296
297 pub fn memory() -> Result<Self> {
299 let conn = Connection::open_in_memory()?;
300 let mut storage = Self {
301 conn,
302 metadata_cache: RefCell::new(HashMap::new()),
303 };
304 storage.initialize()?;
305 Ok(storage)
306 }
307
308 fn initialize(&mut self) -> Result<()> {
310 self.conn.pragma_update(None, "journal_mode", "WAL")?;
312
313 self.conn.create_scalar_function(
316 "fnv1a_hash",
317 1,
318 rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
319 | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
320 |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
321 let pk_ref = ctx.get_raw(0);
322 let pk_bytes = match pk_ref {
323 rusqlite::types::ValueRef::Text(bytes) => bytes,
324 _ => {
325 return Err(rusqlite::Error::InvalidFunctionParameterType(
326 0,
327 rusqlite::types::Type::Text,
328 ));
329 }
330 };
331 let mut hash: u32 = 2166136261;
332 for &byte in pk_bytes {
333 hash ^= byte as u32;
334 hash = hash.wrapping_mul(16777619);
335 }
336 Ok(hash as i64)
337 },
338 )?;
339
340 self.conn.execute_batch(
342 "CREATE TABLE IF NOT EXISTS _config (
343 key TEXT PRIMARY KEY,
344 value TEXT NOT NULL
345 );
346
347 CREATE TABLE IF NOT EXISTS _tables (
348 table_name TEXT PRIMARY KEY,
349 key_schema TEXT NOT NULL,
350 attribute_definitions TEXT NOT NULL,
351 gsi_definitions TEXT,
352 lsi_definitions TEXT,
353 stream_enabled INTEGER DEFAULT 0,
354 stream_view_type TEXT,
355 stream_label TEXT,
356 ttl_attribute TEXT,
357 ttl_enabled INTEGER DEFAULT 0,
358 created_at INTEGER NOT NULL,
359 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
360 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
361 provisioned_throughput TEXT,
362 tags TEXT,
363 sse_specification TEXT,
364 table_class TEXT,
365 deletion_protection_enabled INTEGER DEFAULT 0
366 );
367
368 CREATE TABLE IF NOT EXISTS _stream_records (
369 id INTEGER PRIMARY KEY AUTOINCREMENT,
370 table_name TEXT NOT NULL,
371 event_name TEXT NOT NULL,
372 keys_json TEXT NOT NULL,
373 new_image TEXT,
374 old_image TEXT,
375 sequence_number TEXT NOT NULL,
376 shard_id TEXT NOT NULL,
377 created_at INTEGER NOT NULL,
378 user_identity TEXT
379 );",
380 )?;
381
382 let _ = self
384 .conn
385 .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
386
387 self.conn.execute(
389 "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
390 params![SCHEMA_VERSION],
391 )?;
392
393 let version: i32 = self
395 .conn
396 .query_row(
397 "SELECT value FROM _config WHERE key = 'schema_version'",
398 [],
399 |r| r.get::<_, String>(0),
400 )
401 .unwrap_or_else(|_| "1".to_string())
402 .parse()
403 .unwrap_or(1);
404
405 if version < 2 {
406 self.migrate_v1_to_v2()?;
407 }
408 if version < 3 {
409 self.migrate_v2_to_v3()?;
410 }
411 if version < 4 {
412 self.migrate_v3_to_v4()?;
413 }
414 if version < 5 {
415 self.migrate_v4_to_v5()?;
416 }
417 if version < 6 {
418 self.migrate_v5_to_v6()?;
419 }
420
421 Ok(())
422 }
423
424 fn migrate_v1_to_v2(&self) -> Result<()> {
426 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427 let table_names: Vec<String> = stmt
428 .query_map([], |row| row.get(0))?
429 .collect::<std::result::Result<Vec<_>, _>>()?;
430
431 for table_name in &table_names {
432 let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433 let _ = self.conn.execute(
434 &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435 [],
436 );
437 }
438
439 self.conn.execute(
440 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441 [],
442 )?;
443
444 Ok(())
445 }
446
447 fn migrate_v2_to_v3(&self) -> Result<()> {
449 let _ = self
450 .conn
451 .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453 self.conn.execute(
454 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455 [],
456 )?;
457
458 Ok(())
459 }
460
461 fn migrate_v3_to_v4(&self) -> Result<()> {
463 let _ = self
464 .conn
465 .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466 let _ = self
467 .conn
468 .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469 let _ = self.conn.execute(
470 "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471 [],
472 );
473
474 self.conn.execute(
475 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476 [],
477 )?;
478
479 Ok(())
480 }
481
482 fn migrate_v4_to_v5(&self) -> Result<()> {
484 let mut stmt = self
485 .conn
486 .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487 let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489 .collect::<std::result::Result<Vec<_>, _>>()?;
490
491 for (table_name, gsi_json, lsi_json) in &tables {
492 if let Some(json) = gsi_json {
493 if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494 for gsi in &gsis {
495 if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496 let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497 let idx_name =
498 escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499 let _ = self.conn.execute_batch(&format!(
500 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501 ));
502 }
503 }
504 }
505 }
506 if let Some(json) = lsi_json {
507 if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508 for lsi in &lsis {
509 if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510 let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511 let idx_name =
512 escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513 let _ = self.conn.execute_batch(&format!(
514 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515 ));
516 }
517 }
518 }
519 }
520 }
521
522 self.conn.execute(
523 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524 [],
525 )?;
526
527 Ok(())
528 }
529
530 fn migrate_v5_to_v6(&self) -> Result<()> {
532 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533 let table_names: Vec<String> = stmt
534 .query_map([], |row| row.get(0))?
535 .collect::<std::result::Result<Vec<_>, _>>()?;
536
537 for table_name in &table_names {
538 let escaped = escape_table_name(table_name);
539 let _ = self.conn.execute(
540 &format!(
541 "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542 ),
543 [],
544 );
545 }
546
547 self.conn.execute(
548 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549 [],
550 )?;
551
552 Ok(())
553 }
554
555 pub fn conn(&self) -> &Connection {
557 &self.conn
558 }
559
560 pub fn conn_mut(&mut self) -> &mut Connection {
562 &mut self.conn
563 }
564
565 pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
571 let table_name = m.table_name;
572 self.conn.execute(
573 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, gsi_definitions, \
574 lsi_definitions, provisioned_throughput, created_at, sse_specification, table_class, \
575 deletion_protection_enabled, billing_mode)
576 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
577 params![
578 m.table_name,
579 m.key_schema,
580 m.attribute_definitions,
581 m.gsi_definitions,
582 m.lsi_definitions,
583 m.provisioned_throughput,
584 m.created_at,
585 m.sse_specification,
586 m.table_class,
587 m.deletion_protection_enabled as i32,
588 m.billing_mode,
589 ],
590 )?;
591 self.metadata_cache.borrow_mut().remove(table_name);
592 Ok(())
593 }
594
595 pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
601 if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
603 return Ok(Some(cached.clone()));
604 }
605
606 let sql = format!("SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE table_name = ?1");
607 let mut stmt = self.conn.prepare(&sql)?;
608
609 let result = stmt.query_row(params![table_name], row_to_metadata);
610
611 match result {
612 Ok(meta) => {
613 self.metadata_cache
614 .borrow_mut()
615 .insert(table_name.to_string(), meta.clone());
616 Ok(Some(meta))
617 }
618 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
619 Err(e) => Err(DynoxideError::from(e)),
620 }
621 }
622
623 pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
625 let affected = self.conn.execute(
626 "DELETE FROM _tables WHERE table_name = ?1",
627 params![table_name],
628 )?;
629 self.metadata_cache.borrow_mut().remove(table_name);
630 Ok(affected > 0)
631 }
632
633 pub fn update_table_metadata(
635 &self,
636 table_name: &str,
637 attribute_definitions: &str,
638 gsi_definitions: Option<&str>,
639 ) -> Result<()> {
640 self.conn.execute(
641 "UPDATE _tables SET attribute_definitions = ?1, gsi_definitions = ?2 WHERE table_name = ?3",
642 params![attribute_definitions, gsi_definitions, table_name],
643 )?;
644 self.metadata_cache.borrow_mut().remove(table_name);
645 Ok(())
646 }
647
648 pub fn update_provisioned_throughput(
650 &self,
651 table_name: &str,
652 provisioned_throughput: &str,
653 ) -> Result<()> {
654 self.conn.execute(
655 "UPDATE _tables SET provisioned_throughput = ?1 WHERE table_name = ?2",
656 params![provisioned_throughput, table_name],
657 )?;
658 self.metadata_cache.borrow_mut().remove(table_name);
659 Ok(())
660 }
661
662 pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
664 self.conn.execute(
665 "UPDATE _tables SET provisioned_throughput = NULL WHERE table_name = ?1",
666 params![table_name],
667 )?;
668 self.metadata_cache.borrow_mut().remove(table_name);
669 Ok(())
670 }
671
672 pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
674 self.conn.execute(
675 "UPDATE _tables SET billing_mode = ?1 WHERE table_name = ?2",
676 params![billing_mode, table_name],
677 )?;
678 self.metadata_cache.borrow_mut().remove(table_name);
679 Ok(())
680 }
681
682 pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
688 let tags_json: Option<String> = self.conn.query_row(
689 "SELECT tags FROM _tables WHERE table_name = ?1",
690 params![table_name],
691 |row| row.get(0),
692 )?;
693
694 match tags_json {
695 Some(json) => serde_json::from_str(&json)
696 .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
697 None => Ok(Vec::new()),
698 }
699 }
700
701 pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
703 use std::collections::BTreeMap;
704
705 let existing = self.get_tags(table_name)?;
706 let mut tag_map: BTreeMap<String, String> =
707 existing.into_iter().map(|t| (t.key, t.value)).collect();
708
709 for tag in new_tags {
710 tag_map.insert(tag.key.clone(), tag.value.clone());
711 }
712
713 if tag_map.len() > 50 {
714 return Err(DynoxideError::ValidationException(
715 "One or more parameter values were invalid: \
716 Too many tags: tag limit is 50"
717 .to_string(),
718 ));
719 }
720
721 let merged: Vec<crate::types::Tag> = tag_map
722 .into_iter()
723 .map(|(k, v)| crate::types::Tag { key: k, value: v })
724 .collect();
725
726 let json = serde_json::to_string(&merged)
727 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
728
729 self.conn.execute(
730 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
731 params![json, table_name],
732 )?;
733 Ok(())
734 }
735
736 pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
738 self.conn.execute(
739 "UPDATE _tables SET deletion_protection_enabled = ?1 WHERE table_name = ?2",
740 params![enabled as i32, table_name],
741 )?;
742 self.metadata_cache.borrow_mut().remove(table_name);
743 Ok(())
744 }
745
746 pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
748 let mut tags = self.get_tags(table_name)?;
749 tags.retain(|t| !keys.contains(&t.key));
750
751 let json = if tags.is_empty() {
752 None
753 } else {
754 Some(
755 serde_json::to_string(&tags)
756 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
757 )
758 };
759
760 self.conn.execute(
761 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
762 params![json, table_name],
763 )?;
764 Ok(())
765 }
766
767 pub fn list_table_names(&self) -> Result<Vec<String>> {
769 let mut stmt = self
770 .conn
771 .prepare("SELECT table_name FROM _tables ORDER BY table_name")?;
772 let names = stmt
773 .query_map([], |row| row.get(0))?
774 .collect::<std::result::Result<Vec<String>, _>>()?;
775 Ok(names)
776 }
777
778 pub fn table_exists(&self, table_name: &str) -> Result<bool> {
780 let count: i32 = self.conn.query_row(
781 "SELECT COUNT(*) FROM _tables WHERE table_name = ?1",
782 params![table_name],
783 |row| row.get(0),
784 )?;
785 Ok(count > 0)
786 }
787
788 #[allow(dead_code)]
790 pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
791 self.metadata_cache.borrow_mut().remove(table_name);
792 }
793
794 pub fn create_data_table(&self, table_name: &str) -> Result<()> {
800 let sql = format!(
801 "CREATE TABLE \"{}\" (
802 pk TEXT NOT NULL,
803 sk TEXT NOT NULL DEFAULT '',
804 item_json TEXT NOT NULL,
805 item_size INTEGER NOT NULL,
806 cached_at REAL,
807 hash_prefix TEXT NOT NULL DEFAULT '',
808 PRIMARY KEY (pk, sk)
809 )",
810 escape_table_name(table_name)
811 );
812 self.conn.execute(&sql, [])?;
813 Ok(())
814 }
815
816 pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
818 let sql = format!("DROP TABLE IF EXISTS \"{}\"", escape_table_name(table_name));
819 self.conn.execute(&sql, [])?;
820 Ok(())
821 }
822
823 pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
825 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
826 let escaped = escape_table_name(&gsi_table_name);
827 let sql = format!(
828 "CREATE TABLE \"{escaped}\" (
829 gsi_pk TEXT NOT NULL,
830 gsi_sk TEXT NOT NULL DEFAULT '',
831 table_pk TEXT NOT NULL,
832 table_sk TEXT NOT NULL DEFAULT '',
833 item_json TEXT NOT NULL,
834 PRIMARY KEY (gsi_pk, gsi_sk, table_pk, table_sk)
835 )"
836 );
837 self.conn.execute(&sql, [])?;
838
839 let idx_name = escape_table_name(&format!("{gsi_table_name}::base_key"));
840 self.conn.execute_batch(&format!(
841 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (table_pk, table_sk)"
842 ))?;
843 Ok(())
844 }
845
846 pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
848 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
849 let sql = format!(
850 "DROP TABLE IF EXISTS \"{}\"",
851 escape_table_name(&gsi_table_name)
852 );
853 self.conn.execute(&sql, [])?;
854 Ok(())
855 }
856
857 #[allow(clippy::too_many_arguments)]
863 pub fn insert_gsi_item(
864 &self,
865 table_name: &str,
866 index_name: &str,
867 gsi_pk: &str,
868 gsi_sk: &str,
869 table_pk: &str,
870 table_sk: &str,
871 item_json: &str,
872 ) -> Result<()> {
873 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
874 let sql = format!(
875 "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
876 escape_table_name(&gsi_table_name)
877 );
878 self.conn
879 .prepare_cached(&sql)?
880 .execute(params![gsi_pk, gsi_sk, table_pk, table_sk, item_json])?;
881 Ok(())
882 }
883
884 pub fn delete_gsi_item(
886 &self,
887 table_name: &str,
888 index_name: &str,
889 table_pk: &str,
890 table_sk: &str,
891 ) -> Result<()> {
892 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
893 let sql = format!(
894 "DELETE FROM \"{}\" WHERE table_pk = ?1 AND table_sk = ?2",
895 escape_table_name(&gsi_table_name)
896 );
897 self.conn
898 .prepare_cached(&sql)?
899 .execute(params![table_pk, table_sk])?;
900 Ok(())
901 }
902
903 pub fn query_gsi_items(
905 &self,
906 table_name: &str,
907 index_name: &str,
908 gsi_pk: &str,
909 params: &QueryParams,
910 ) -> Result<Vec<(String, String, String)>> {
911 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
912 let mut sql = format!(
913 "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\" WHERE gsi_pk = ?1",
914 escape_table_name(&gsi_table_name)
915 );
916
917 let mut param_idx = 2;
918 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> =
919 vec![Box::new(gsi_pk.to_string())];
920
921 if let Some(cond) = params.sk_condition {
922 let gsi_cond = cond.replace(" sk ", " gsi_sk ").replace(" sk>", " gsi_sk>");
929 sql.push(' ');
930 sql.push_str(&gsi_cond);
931 for &p in params.sk_params {
932 all_params.push(Box::new(p.to_string()));
933 param_idx += 1;
934 }
935 }
936
937 if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
940 params.exclusive_start_sk,
941 params.exclusive_start_base_pk,
942 params.exclusive_start_base_sk,
943 ) {
944 let op = if params.forward { ">" } else { "<" };
945 sql.push_str(&format!(
946 " AND (gsi_sk, table_pk, table_sk) {op} (?{}, ?{}, ?{})",
947 param_idx,
948 param_idx + 1,
949 param_idx + 2
950 ));
951 all_params.push(Box::new(start_sk.to_string()));
952 all_params.push(Box::new(start_base_pk.to_string()));
953 all_params.push(Box::new(start_base_sk.to_string()));
954 } else if let Some(start_sk) = params.exclusive_start_sk {
955 if params.forward {
956 sql.push_str(&format!(" AND gsi_sk > ?{param_idx}"));
957 } else {
958 sql.push_str(&format!(" AND gsi_sk < ?{param_idx}"));
959 }
960 all_params.push(Box::new(start_sk.to_string()));
961 }
962
963 sql.push_str(if params.forward {
964 " ORDER BY gsi_sk ASC, table_pk ASC, table_sk ASC"
965 } else {
966 " ORDER BY gsi_sk DESC, table_pk DESC, table_sk DESC"
967 });
968
969 if let Some(lim) = params.limit {
970 sql.push_str(&format!(" LIMIT {lim}"));
971 }
972
973 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
974 all_params.iter().map(|p| p.as_ref()).collect();
975 let mut stmt = self.conn.prepare(&sql)?;
976 let rows = stmt
977 .query_map(param_refs.as_slice(), |row| {
978 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
979 })?
980 .collect::<std::result::Result<Vec<_>, _>>()?;
981
982 Ok(rows)
983 }
984
985 pub fn scan_gsi_items(
987 &self,
988 table_name: &str,
989 index_name: &str,
990 params: &ScanParams,
991 ) -> Result<Vec<(String, String, String)>> {
992 let gsi_table_name = format!("{table_name}::gsi::{index_name}");
993 let mut sql = format!(
994 "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\"",
995 escape_table_name(&gsi_table_name)
996 );
997
998 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
999 let mut where_clauses = Vec::new();
1000 let mut param_idx = 1;
1001
1002 if let (Some(start_pk), Some(start_sk)) =
1003 (params.exclusive_start_pk, params.exclusive_start_sk)
1004 {
1005 if let (Some(base_pk), Some(base_sk)) = (
1009 params.exclusive_start_base_pk,
1010 params.exclusive_start_base_sk,
1011 ) {
1012 where_clauses.push(format!(
1013 "(gsi_pk, gsi_sk, table_pk, table_sk) > (?{}, ?{}, ?{}, ?{})",
1014 param_idx,
1015 param_idx + 1,
1016 param_idx + 2,
1017 param_idx + 3
1018 ));
1019 params_vec.push(Box::new(start_pk.to_string()));
1020 params_vec.push(Box::new(start_sk.to_string()));
1021 params_vec.push(Box::new(base_pk.to_string()));
1022 params_vec.push(Box::new(base_sk.to_string()));
1023 param_idx += 4;
1024 } else {
1025 where_clauses.push(format!(
1026 "(gsi_pk, gsi_sk) > (?{}, ?{})",
1027 param_idx,
1028 param_idx + 1
1029 ));
1030 params_vec.push(Box::new(start_pk.to_string()));
1031 params_vec.push(Box::new(start_sk.to_string()));
1032 param_idx += 2;
1033 }
1034 }
1035
1036 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1038 where_clauses.push(format!(
1039 "(fnv1a_hash(table_pk) % ?{}) = ?{}",
1040 param_idx,
1041 param_idx + 1
1042 ));
1043 params_vec.push(Box::new(total as i64));
1044 params_vec.push(Box::new(seg as i64));
1045 }
1046
1047 if !where_clauses.is_empty() {
1048 sql.push_str(" WHERE ");
1049 sql.push_str(&where_clauses.join(" AND "));
1050 }
1051
1052 sql.push_str(" ORDER BY gsi_pk ASC, gsi_sk ASC, table_pk ASC, table_sk ASC");
1053
1054 if let Some(lim) = params.limit {
1055 sql.push_str(&format!(" LIMIT {lim}"));
1056 }
1057
1058 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1059 params_vec.iter().map(|p| p.as_ref()).collect();
1060 let mut stmt = self.conn.prepare(&sql)?;
1061 let rows = stmt
1062 .query_map(param_refs.as_slice(), |row| {
1063 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1064 })?
1065 .collect::<std::result::Result<Vec<_>, _>>()?;
1066
1067 Ok(rows)
1068 }
1069
1070 pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1076 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1077 let escaped = escape_table_name(&lsi_table_name);
1078 let sql = format!(
1079 "CREATE TABLE \"{escaped}\" (
1080 pk TEXT NOT NULL,
1081 sk TEXT NOT NULL DEFAULT '',
1082 base_pk TEXT NOT NULL,
1083 base_sk TEXT NOT NULL DEFAULT '',
1084 item_json TEXT NOT NULL,
1085 PRIMARY KEY (pk, sk, base_pk, base_sk)
1086 )"
1087 );
1088 self.conn.execute(&sql, [])?;
1089
1090 let idx_name = escape_table_name(&format!("{lsi_table_name}::base_key"));
1091 self.conn.execute_batch(&format!(
1092 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (base_pk, base_sk)"
1093 ))?;
1094 Ok(())
1095 }
1096
1097 pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1099 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1100 let sql = format!(
1101 "DROP TABLE IF EXISTS \"{}\"",
1102 escape_table_name(&lsi_table_name)
1103 );
1104 self.conn.execute(&sql, [])?;
1105 Ok(())
1106 }
1107
1108 #[allow(clippy::too_many_arguments)]
1114 pub fn insert_lsi_item(
1115 &self,
1116 table_name: &str,
1117 index_name: &str,
1118 pk: &str,
1119 sk: &str,
1120 base_pk: &str,
1121 base_sk: &str,
1122 item_json: &str,
1123 ) -> Result<()> {
1124 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1125 let sql = format!(
1126 "INSERT OR REPLACE INTO \"{}\" (pk, sk, base_pk, base_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
1127 escape_table_name(&lsi_table_name)
1128 );
1129 self.conn
1130 .prepare_cached(&sql)?
1131 .execute(params![pk, sk, base_pk, base_sk, item_json])?;
1132 Ok(())
1133 }
1134
1135 pub fn delete_lsi_item(
1137 &self,
1138 table_name: &str,
1139 index_name: &str,
1140 base_pk: &str,
1141 base_sk: &str,
1142 ) -> Result<()> {
1143 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1144 let sql = format!(
1145 "DELETE FROM \"{}\" WHERE base_pk = ?1 AND base_sk = ?2",
1146 escape_table_name(&lsi_table_name)
1147 );
1148 self.conn
1149 .prepare_cached(&sql)?
1150 .execute(params![base_pk, base_sk])?;
1151 Ok(())
1152 }
1153
1154 pub fn query_lsi_items(
1156 &self,
1157 table_name: &str,
1158 index_name: &str,
1159 pk: &str,
1160 params: &QueryParams,
1161 ) -> Result<Vec<(String, String, String)>> {
1162 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1163 let mut sql = format!(
1164 "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1165 escape_table_name(&lsi_table_name)
1166 );
1167
1168 let mut param_idx = 2;
1169 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1170
1171 if let Some(cond) = params.sk_condition {
1172 sql.push(' ');
1173 sql.push_str(cond);
1174 for &p in params.sk_params {
1175 all_params.push(Box::new(p.to_string()));
1176 param_idx += 1;
1177 }
1178 }
1179
1180 if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1183 params.exclusive_start_sk,
1184 params.exclusive_start_base_pk,
1185 params.exclusive_start_base_sk,
1186 ) {
1187 let op = if params.forward { ">" } else { "<" };
1188 sql.push_str(&format!(
1189 " AND (sk, base_pk, base_sk) {op} (?{}, ?{}, ?{})",
1190 param_idx,
1191 param_idx + 1,
1192 param_idx + 2
1193 ));
1194 all_params.push(Box::new(start_sk.to_string()));
1195 all_params.push(Box::new(start_base_pk.to_string()));
1196 all_params.push(Box::new(start_base_sk.to_string()));
1197 } else if let Some(start_sk) = params.exclusive_start_sk {
1198 if params.forward {
1199 sql.push_str(&format!(" AND sk > ?{param_idx}"));
1200 } else {
1201 sql.push_str(&format!(" AND sk < ?{param_idx}"));
1202 }
1203 all_params.push(Box::new(start_sk.to_string()));
1204 }
1205
1206 sql.push_str(if params.forward {
1207 " ORDER BY sk ASC, base_pk ASC, base_sk ASC"
1208 } else {
1209 " ORDER BY sk DESC, base_pk DESC, base_sk DESC"
1210 });
1211
1212 if let Some(lim) = params.limit {
1213 sql.push_str(&format!(" LIMIT {lim}"));
1214 }
1215
1216 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1217 all_params.iter().map(|p| p.as_ref()).collect();
1218 let mut stmt = self.conn.prepare(&sql)?;
1219 let rows = stmt
1220 .query_map(param_refs.as_slice(), |row| {
1221 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1222 })?
1223 .collect::<std::result::Result<Vec<_>, _>>()?;
1224
1225 Ok(rows)
1226 }
1227
1228 pub fn scan_lsi_items(
1230 &self,
1231 table_name: &str,
1232 index_name: &str,
1233 params: &ScanParams,
1234 ) -> Result<Vec<(String, String, String)>> {
1235 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1236 let mut sql = format!(
1237 "SELECT pk, sk, item_json FROM \"{}\"",
1238 escape_table_name(&lsi_table_name)
1239 );
1240
1241 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1242 let mut where_clauses = Vec::new();
1243 let mut param_idx = 1;
1244
1245 if let (Some(start_pk), Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1247 params.exclusive_start_pk,
1248 params.exclusive_start_sk,
1249 params.exclusive_start_base_pk,
1250 params.exclusive_start_base_sk,
1251 ) {
1252 where_clauses.push(format!(
1253 "(pk, sk, base_pk, base_sk) > (?{}, ?{}, ?{}, ?{})",
1254 param_idx,
1255 param_idx + 1,
1256 param_idx + 2,
1257 param_idx + 3
1258 ));
1259 params_vec.push(Box::new(start_pk.to_string()));
1260 params_vec.push(Box::new(start_sk.to_string()));
1261 params_vec.push(Box::new(start_base_pk.to_string()));
1262 params_vec.push(Box::new(start_base_sk.to_string()));
1263 param_idx += 4;
1264 } else if let (Some(start_pk), Some(start_sk)) =
1265 (params.exclusive_start_pk, params.exclusive_start_sk)
1266 {
1267 where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1268 params_vec.push(Box::new(start_pk.to_string()));
1269 params_vec.push(Box::new(start_sk.to_string()));
1270 param_idx += 2;
1271 }
1272
1273 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1275 where_clauses.push(format!(
1276 "(fnv1a_hash(base_pk) % ?{}) = ?{}",
1277 param_idx,
1278 param_idx + 1
1279 ));
1280 params_vec.push(Box::new(total as i64));
1281 params_vec.push(Box::new(seg as i64));
1282 }
1283
1284 if !where_clauses.is_empty() {
1285 sql.push_str(" WHERE ");
1286 sql.push_str(&where_clauses.join(" AND "));
1287 }
1288
1289 sql.push_str(" ORDER BY pk ASC, sk ASC");
1290
1291 if let Some(lim) = params.limit {
1292 sql.push_str(&format!(" LIMIT {lim}"));
1293 }
1294
1295 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1296 params_vec.iter().map(|p| p.as_ref()).collect();
1297 let mut stmt = self.conn.prepare(&sql)?;
1298 let rows = stmt
1299 .query_map(param_refs.as_slice(), |row| {
1300 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1301 })?
1302 .collect::<std::result::Result<Vec<_>, _>>()?;
1303
1304 Ok(rows)
1305 }
1306
1307 pub fn begin_transaction(&self) -> Result<()> {
1313 self.conn.execute_batch("BEGIN IMMEDIATE")?;
1314 Ok(())
1315 }
1316
1317 pub fn commit(&self) -> Result<()> {
1319 self.conn.execute_batch("COMMIT")?;
1320 Ok(())
1321 }
1322
1323 pub fn rollback(&self) -> Result<()> {
1325 self.conn.execute_batch("ROLLBACK")?;
1326 Ok(())
1327 }
1328
1329 pub fn enable_bulk_loading(&self) -> Result<()> {
1335 self.conn.execute_batch(
1336 "PRAGMA synchronous = OFF;
1337 PRAGMA cache_size = -64000;
1338 PRAGMA temp_store = MEMORY;
1339 PRAGMA mmap_size = 268435456;",
1340 )?;
1341 Ok(())
1342 }
1343
1344 pub fn disable_bulk_loading(&self) -> Result<()> {
1346 self.conn.execute_batch(
1347 "PRAGMA synchronous = NORMAL;
1348 PRAGMA cache_size = -2000;
1349 PRAGMA temp_store = DEFAULT;
1350 PRAGMA mmap_size = 0;",
1351 )?;
1352 Ok(())
1353 }
1354
1355 pub fn put_item(
1361 &self,
1362 table_name: &str,
1363 pk: &str,
1364 sk: &str,
1365 item_json: &str,
1366 item_size: usize,
1367 ) -> Result<Option<String>> {
1368 self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1369 }
1370
1371 pub fn put_item_with_hash(
1373 &self,
1374 table_name: &str,
1375 pk: &str,
1376 sk: &str,
1377 item_json: &str,
1378 item_size: usize,
1379 hash_prefix: &str,
1380 ) -> Result<Option<String>> {
1381 let old_item = self.get_item(table_name, pk, sk)?;
1383
1384 let escaped = escape_table_name(table_name);
1385 let sql = format!(
1386 "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1387 VALUES (?1, ?2, ?3, ?4, \
1388 (SELECT cached_at FROM \"{escaped}\" WHERE pk = ?1 AND sk = ?2), ?5)"
1389 );
1390 self.conn.execute(
1391 &sql,
1392 params![pk, sk, item_json, item_size as i64, hash_prefix],
1393 )?;
1394
1395 Ok(old_item)
1396 }
1397
1398 pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1400 let sql = format!(
1401 "SELECT item_json FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1402 escape_table_name(table_name)
1403 );
1404 let result = self.conn.query_row(&sql, params![pk, sk], |row| row.get(0));
1405
1406 match result {
1407 Ok(json) => Ok(Some(json)),
1408 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1409 Err(e) => Err(DynoxideError::from(e)),
1410 }
1411 }
1412
1413 pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1415 let sql = format!(
1416 "SELECT COALESCE(SUM(item_size), 0) FROM \"{}\" WHERE pk = ?1",
1417 escape_table_name(table_name)
1418 );
1419 let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1420 Ok(size)
1421 }
1422
1423 pub fn get_lsi_partition_size(
1426 &self,
1427 table_name: &str,
1428 index_name: &str,
1429 pk: &str,
1430 ) -> Result<i64> {
1431 let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1432 let sql = format!(
1433 "SELECT COALESCE(SUM(length(item_json)), 0) FROM \"{}\" WHERE pk = ?1",
1434 escape_table_name(&lsi_table_name)
1435 );
1436 let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1437 Ok(size)
1438 }
1439
1440 pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1442 let old_item = self.get_item(table_name, pk, sk)?;
1443
1444 let sql = format!(
1445 "DELETE FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1446 escape_table_name(table_name)
1447 );
1448 self.conn.execute(&sql, params![pk, sk])?;
1449
1450 Ok(old_item)
1451 }
1452
1453 pub fn query_items(
1458 &self,
1459 table_name: &str,
1460 pk: &str,
1461 params: &QueryParams,
1462 ) -> Result<Vec<(String, String, String)>> {
1463 let mut sql = format!(
1464 "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1465 escape_table_name(table_name)
1466 );
1467
1468 let mut param_idx = 2;
1469 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1470
1471 if let Some(cond) = params.sk_condition {
1472 sql.push(' ');
1473 sql.push_str(cond);
1474 for &p in params.sk_params {
1475 all_params.push(Box::new(p.to_string()));
1476 param_idx += 1;
1477 }
1478 }
1479
1480 if let Some(start_sk) = params.exclusive_start_sk {
1481 if params.forward {
1482 sql.push_str(&format!(" AND sk > ?{param_idx}"));
1483 } else {
1484 sql.push_str(&format!(" AND sk < ?{param_idx}"));
1485 }
1486 all_params.push(Box::new(start_sk.to_string()));
1487 }
1488
1489 sql.push_str(if params.forward {
1490 " ORDER BY sk ASC"
1491 } else {
1492 " ORDER BY sk DESC"
1493 });
1494
1495 if let Some(lim) = params.limit {
1496 sql.push_str(&format!(" LIMIT {lim}"));
1497 }
1498
1499 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1500 all_params.iter().map(|p| p.as_ref()).collect();
1501 let mut stmt = self.conn.prepare(&sql)?;
1502 let rows = stmt
1503 .query_map(param_refs.as_slice(), |row| {
1504 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1505 })?
1506 .collect::<std::result::Result<Vec<_>, _>>()?;
1507
1508 Ok(rows)
1509 }
1510
1511 pub fn scan_items(
1516 &self,
1517 table_name: &str,
1518 params: &ScanParams,
1519 ) -> Result<Vec<(String, String, String)>> {
1520 let mut sql = format!(
1521 "SELECT pk, sk, item_json FROM \"{}\"",
1522 escape_table_name(table_name)
1523 );
1524
1525 let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1526 let mut where_clauses = Vec::new();
1527 let mut param_idx = 1;
1528
1529 let is_parallel = params.segment.is_some() && params.total_segments.is_some();
1531
1532 if let (Some(start_pk), Some(start_sk)) =
1533 (params.exclusive_start_pk, params.exclusive_start_sk)
1534 {
1535 if is_parallel {
1536 where_clauses.push(format!(
1538 "(hash_prefix, pk, sk) > ((SELECT hash_prefix FROM \"{}\" WHERE pk = ?{} AND sk = ?{} LIMIT 1), ?{}, ?{})",
1539 escape_table_name(table_name),
1540 param_idx, param_idx + 1,
1541 param_idx, param_idx + 1
1542 ));
1543 } else {
1544 where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1545 }
1546 params_vec.push(Box::new(start_pk.to_string()));
1547 params_vec.push(Box::new(start_sk.to_string()));
1548 param_idx += 2;
1549 }
1550
1551 if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1552 let start_bucket = ceiling_div(HASH_BUCKETS * seg, total);
1556 let end_bucket = ceiling_div(HASH_BUCKETS * (seg + 1), total) - 1;
1557 let start_hex = format!("{:03x}", start_bucket);
1558 let end_hex = format!("{:03x}", end_bucket);
1559 where_clauses.push(format!(
1561 "substr(hash_prefix, 1, 3) >= ?{} AND substr(hash_prefix, 1, 3) <= ?{}",
1562 param_idx,
1563 param_idx + 1
1564 ));
1565 params_vec.push(Box::new(start_hex));
1566 params_vec.push(Box::new(end_hex));
1567 }
1568
1569 if !where_clauses.is_empty() {
1570 sql.push_str(" WHERE ");
1571 sql.push_str(&where_clauses.join(" AND "));
1572 }
1573
1574 if is_parallel {
1577 sql.push_str(" ORDER BY hash_prefix ASC, pk ASC, sk ASC");
1578 } else {
1579 sql.push_str(" ORDER BY pk ASC, sk ASC");
1580 }
1581
1582 if let Some(lim) = params.limit {
1583 sql.push_str(&format!(" LIMIT {lim}"));
1584 }
1585
1586 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1587 params_vec.iter().map(|p| p.as_ref()).collect();
1588 let mut stmt = self.conn.prepare(&sql)?;
1589 let rows = stmt
1590 .query_map(param_refs.as_slice(), |row| {
1591 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1592 })?
1593 .collect::<std::result::Result<Vec<_>, _>>()?;
1594
1595 Ok(rows)
1596 }
1597
1598 pub fn count_items(&self, table_name: &str) -> Result<i64> {
1600 let sql = format!("SELECT COUNT(*) FROM \"{}\"", escape_table_name(table_name));
1601 let count: i64 = self.conn.query_row(&sql, [], |row| row.get(0))?;
1602 Ok(count)
1603 }
1604
1605 pub fn db_path(&self) -> Option<String> {
1611 self.conn
1612 .path()
1613 .filter(|p| !p.is_empty())
1614 .map(|p| p.to_owned())
1615 }
1616
1617 pub fn db_size_bytes(&self) -> Result<u64> {
1619 let size: i64 = self.conn.query_row(
1620 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1621 [],
1622 |row| row.get(0),
1623 )?;
1624 Ok(size as u64)
1625 }
1626
1627 pub fn table_count(&self) -> Result<usize> {
1629 let count: i64 = self
1630 .conn
1631 .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1632 Ok(count as usize)
1633 }
1634
1635 pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1639 let table_names = self.list_table_names()?;
1640 let mut stats = Vec::with_capacity(table_names.len());
1641 for name in table_names {
1642 let sql = format!(
1643 "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1644 escape_table_name(&name)
1645 );
1646 let (item_count, size_bytes): (i64, i64) = self
1647 .conn
1648 .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1649 stats.push(TableStats {
1650 table_name: name,
1651 item_count,
1652 size_bytes: size_bytes as u64,
1653 });
1654 }
1655 Ok(stats)
1656 }
1657
1658 pub fn database_info(&self) -> Result<DatabaseInfo> {
1663 let path = self.db_path();
1664 let size_bytes = self.db_size_bytes()?;
1665 let table_count = self.table_count()?;
1666 let stats = self.table_stats()?;
1667
1668 let mut table_details = Vec::with_capacity(stats.len());
1669 for s in stats {
1670 let metadata = self.get_table_metadata(&s.table_name)?;
1671 table_details.push(TableInfoEntry { stats: s, metadata });
1672 }
1673
1674 Ok(DatabaseInfo {
1675 path,
1676 size_bytes,
1677 table_count,
1678 tables: table_details,
1679 })
1680 }
1681
1682 pub fn vacuum_into(&self, path: &str) -> Result<()> {
1689 if path.contains('\0') {
1690 return Err(DynoxideError::ValidationException(
1691 "path contains null byte".to_string(),
1692 ));
1693 }
1694 self.conn
1695 .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1696 Ok(())
1697 }
1698
1699 pub fn vacuum(&self) -> Result<()> {
1701 self.conn.execute_batch("VACUUM")?;
1702 Ok(())
1703 }
1704
1705 pub fn restore_from(&mut self, path: &str) -> Result<()> {
1709 let source = Connection::open(path)?;
1710 self.restore_from_connection(&source)
1711 }
1712
1713 pub fn backup_to_memory(&self) -> Result<Connection> {
1718 let mut dest = Connection::open_in_memory()?;
1719 {
1720 let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1721 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1722 }
1723 Ok(dest)
1724 }
1725
1726 pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1731 let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1732 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1733 self.metadata_cache.borrow_mut().clear();
1734 Ok(())
1735 }
1736
1737 pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1739 let size: i64 = conn.query_row(
1740 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1741 [],
1742 |row| row.get(0),
1743 )?;
1744 Ok(size as u64)
1745 }
1746
1747 pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1753 self.conn.execute(
1754 "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1755 params![view_type, label, table_name],
1756 )?;
1757 self.metadata_cache.borrow_mut().remove(table_name);
1758 Ok(())
1759 }
1760
1761 pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1763 self.conn.execute(
1764 "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1765 params![table_name],
1766 )?;
1767 self.metadata_cache.borrow_mut().remove(table_name);
1768 Ok(())
1769 }
1770
1771 #[allow(clippy::too_many_arguments)]
1773 pub fn insert_stream_record(
1774 &self,
1775 table_name: &str,
1776 event_name: &str,
1777 keys_json: &str,
1778 new_image: Option<&str>,
1779 old_image: Option<&str>,
1780 sequence_number: &str,
1781 shard_id: &str,
1782 created_at: i64,
1783 ) -> Result<()> {
1784 self.insert_stream_record_with_identity(
1785 table_name,
1786 event_name,
1787 keys_json,
1788 new_image,
1789 old_image,
1790 sequence_number,
1791 shard_id,
1792 created_at,
1793 None,
1794 )
1795 }
1796
1797 #[allow(clippy::too_many_arguments)]
1799 pub fn insert_stream_record_with_identity(
1800 &self,
1801 table_name: &str,
1802 event_name: &str,
1803 keys_json: &str,
1804 new_image: Option<&str>,
1805 old_image: Option<&str>,
1806 sequence_number: &str,
1807 shard_id: &str,
1808 created_at: i64,
1809 user_identity: Option<&str>,
1810 ) -> Result<()> {
1811 self.conn.execute(
1812 "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1813 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1814 params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1815 )?;
1816 Ok(())
1817 }
1818
1819 pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1821 let result: std::result::Result<i64, _> = self.conn.query_row(
1822 "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1823 params![table_name],
1824 |row| row.get(0),
1825 );
1826 match result {
1827 Ok(n) => Ok(n),
1828 Err(_) => Ok(1),
1829 }
1830 }
1831
1832 pub fn get_stream_records(
1834 &self,
1835 table_name: &str,
1836 shard_id: &str,
1837 after_sequence: i64,
1838 limit: usize,
1839 ) -> Result<Vec<StreamRecord>> {
1840 let mut stmt = self.conn.prepare(
1841 "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1842 FROM _stream_records
1843 WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1844 ORDER BY CAST(sequence_number AS INTEGER) ASC
1845 LIMIT ?4",
1846 )?;
1847 let rows = stmt
1848 .query_map(
1849 params![table_name, shard_id, after_sequence, limit as i64],
1850 |row| {
1851 Ok(StreamRecord {
1852 event_name: row.get(0)?,
1853 keys_json: row.get(1)?,
1854 new_image: row.get(2)?,
1855 old_image: row.get(3)?,
1856 sequence_number: row.get(4)?,
1857 created_at: row.get(5)?,
1858 user_identity: row.get(6)?,
1859 })
1860 },
1861 )?
1862 .collect::<std::result::Result<Vec<_>, _>>()?;
1863 Ok(rows)
1864 }
1865
1866 pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1868 let sql = format!(
1869 "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name"
1870 );
1871 let mut stmt = self.conn.prepare(&sql)?;
1872 let rows = stmt
1873 .query_map([], row_to_metadata)?
1874 .collect::<std::result::Result<Vec<_>, _>>()?;
1875 Ok(rows)
1876 }
1877
1878 pub fn update_ttl_config(
1884 &self,
1885 table_name: &str,
1886 attribute_name: Option<&str>,
1887 enabled: bool,
1888 ) -> Result<()> {
1889 self.conn.execute(
1890 "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1891 params![attribute_name, enabled as i32, table_name],
1892 )?;
1893 self.metadata_cache.borrow_mut().remove(table_name);
1894 Ok(())
1895 }
1896
1897 pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1899 let sql = format!(
1900 "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name"
1901 );
1902 let mut stmt = self.conn.prepare(&sql)?;
1903 let rows = stmt
1904 .query_map([], row_to_metadata)?
1905 .collect::<std::result::Result<Vec<_>, _>>()?;
1906 Ok(rows)
1907 }
1908
1909 pub fn get_shard_sequence_range(
1911 &self,
1912 table_name: &str,
1913 shard_id: &str,
1914 ) -> Result<(Option<String>, Option<String>)> {
1915 let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1916 "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1917 params![table_name, shard_id],
1918 |row| Ok((row.get(0)?, row.get(1)?)),
1919 );
1920 match result {
1921 Ok(range) => Ok(range),
1922 Err(_) => Ok((None, None)),
1923 }
1924 }
1925
1926 pub fn touch_cached_at(
1932 &self,
1933 table_name: &str,
1934 pk: &str,
1935 sk: &str,
1936 timestamp: f64,
1937 ) -> Result<()> {
1938 let sql = format!(
1939 "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1940 escape_table_name(table_name)
1941 );
1942 self.conn.execute(&sql, params![timestamp, pk, sk])?;
1943 Ok(())
1944 }
1945
1946 pub fn get_lru_items(
1951 &self,
1952 table_name: &str,
1953 limit: usize,
1954 ) -> Result<Vec<(String, String, i64)>> {
1955 let sql = format!(
1956 "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1957 escape_table_name(table_name)
1958 );
1959 let mut stmt = self.conn.prepare(&sql)?;
1960 let rows = stmt
1961 .query_map(params![limit as i64], |row| {
1962 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1963 })?
1964 .collect::<std::result::Result<Vec<_>, _>>()?;
1965 Ok(rows)
1966 }
1967}
1968
1969#[derive(Debug, Clone)]
1971pub struct StreamRecord {
1972 pub event_name: String,
1973 pub keys_json: String,
1974 pub new_image: Option<String>,
1975 pub old_image: Option<String>,
1976 pub sequence_number: String,
1977 pub created_at: i64,
1978 pub user_identity: Option<String>,
1979}
1980
1981#[derive(Debug, Clone)]
1983pub struct TableStats {
1984 pub table_name: String,
1985 pub item_count: i64,
1986 pub size_bytes: u64,
1987}
1988
1989#[derive(Debug, Clone)]
1991pub struct DatabaseInfo {
1992 pub path: Option<String>,
1993 pub size_bytes: u64,
1994 pub table_count: usize,
1995 pub tables: Vec<TableInfoEntry>,
1996}
1997
1998#[derive(Debug, Clone)]
2000pub struct TableInfoEntry {
2001 pub stats: TableStats,
2002 pub metadata: Option<TableMetadata>,
2003}
2004
2005pub(crate) fn escape_table_name(name: &str) -> String {
2007 name.replace('"', "\"\"")
2008}
2009
2010#[derive(Debug, Clone)]
2016pub struct TableMetadata {
2017 pub table_name: String,
2018 pub key_schema: String,
2019 pub attribute_definitions: String,
2020 pub gsi_definitions: Option<String>,
2021 pub lsi_definitions: Option<String>,
2022 pub stream_enabled: bool,
2023 pub stream_view_type: Option<String>,
2024 pub stream_label: Option<String>,
2025 pub ttl_attribute: Option<String>,
2026 pub ttl_enabled: bool,
2027 pub created_at: i64,
2028 pub table_status: String,
2029 pub billing_mode: Option<String>,
2030 pub provisioned_throughput: Option<String>,
2031 pub sse_specification: Option<String>,
2032 pub table_class: Option<String>,
2033 pub deletion_protection_enabled: bool,
2034}
2035
2036const TABLE_METADATA_COLUMNS: &str = "table_name, key_schema, attribute_definitions, gsi_definitions, \
2038 lsi_definitions, stream_enabled, stream_view_type, stream_label, ttl_attribute, ttl_enabled, \
2039 created_at, table_status, billing_mode, provisioned_throughput, \
2040 sse_specification, table_class, deletion_protection_enabled";
2041
2042fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
2044 Ok(TableMetadata {
2045 table_name: row.get(0)?,
2046 key_schema: row.get(1)?,
2047 attribute_definitions: row.get(2)?,
2048 gsi_definitions: row.get(3)?,
2049 lsi_definitions: row.get(4)?,
2050 stream_enabled: row.get::<_, i32>(5)? != 0,
2051 stream_view_type: row.get(6)?,
2052 stream_label: row.get(7)?,
2053 ttl_attribute: row.get(8)?,
2054 ttl_enabled: row.get::<_, i32>(9)? != 0,
2055 created_at: row.get(10)?,
2056 table_status: row.get(11)?,
2057 billing_mode: row.get(12)?,
2058 provisioned_throughput: row.get(13)?,
2059 sse_specification: row.get(14)?,
2060 table_class: row.get(15)?,
2061 deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
2062 })
2063}
2064
2065#[cfg(test)]
2066mod tests {
2067 use super::*;
2068
2069 fn test_storage() -> Storage {
2070 Storage::memory().expect("Failed to create in-memory storage")
2071 }
2072
2073 #[test]
2074 fn test_initialize_creates_metadata_tables() {
2075 let storage = test_storage();
2076 let version: String = storage
2078 .conn()
2079 .query_row(
2080 "SELECT value FROM _config WHERE key = 'schema_version'",
2081 [],
2082 |row| row.get(0),
2083 )
2084 .unwrap();
2085 assert_eq!(version, SCHEMA_VERSION);
2086 }
2087
2088 #[test]
2089 fn test_wal_mode_enabled() {
2090 let storage = test_storage();
2091 let mode: String = storage
2092 .conn()
2093 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
2094 .unwrap();
2095 assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
2097 }
2098
2099 #[test]
2100 fn test_table_metadata_crud() {
2101 let storage = test_storage();
2102
2103 assert!(!storage.table_exists("TestTable").unwrap());
2105 assert!(storage.list_table_names().unwrap().is_empty());
2106
2107 storage
2109 .insert_table_metadata(&CreateTableMetadata {
2110 table_name: "TestTable",
2111 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2112 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2113 created_at: 1000000,
2114 ..Default::default()
2115 })
2116 .unwrap();
2117
2118 assert!(storage.table_exists("TestTable").unwrap());
2119 assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2120
2121 let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2123 assert_eq!(meta.table_name, "TestTable");
2124 assert_eq!(meta.table_status, "ACTIVE");
2125 assert_eq!(meta.created_at, 1000000);
2126
2127 assert!(storage.delete_table_metadata("TestTable").unwrap());
2129 assert!(!storage.table_exists("TestTable").unwrap());
2130 }
2131
2132 #[test]
2133 fn test_create_and_drop_data_table() {
2134 let storage = test_storage();
2135 storage.create_data_table("MyTable").unwrap();
2136
2137 storage
2139 .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2140 .unwrap();
2141
2142 let item = storage.get_item("MyTable", "pk1", "").unwrap();
2143 assert!(item.is_some());
2144
2145 storage.drop_data_table("MyTable").unwrap();
2146 }
2147
2148 #[test]
2149 fn test_item_crud() {
2150 let storage = test_storage();
2151 storage.create_data_table("Items").unwrap();
2152
2153 let old = storage
2155 .put_item(
2156 "Items",
2157 "user#1",
2158 "profile",
2159 r#"{"name":{"S":"Alice"}}"#,
2160 20,
2161 )
2162 .unwrap();
2163 assert!(old.is_none()); let item = storage.get_item("Items", "user#1", "profile").unwrap();
2167 assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2168
2169 let old = storage
2171 .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2172 .unwrap();
2173 assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2174
2175 let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2177 assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2178
2179 assert!(
2181 storage
2182 .get_item("Items", "user#1", "profile")
2183 .unwrap()
2184 .is_none()
2185 );
2186 }
2187
2188 #[test]
2189 fn test_query_items() {
2190 let storage = test_storage();
2191 storage.create_data_table("Orders").unwrap();
2192
2193 for i in 1..=5 {
2195 let sk = format!("order#{i:03}");
2196 let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2197 storage
2198 .put_item("Orders", "user#1", &sk, &json, 10)
2199 .unwrap();
2200 }
2201
2202 let results = storage
2204 .query_items(
2205 "Orders",
2206 "user#1",
2207 &QueryParams {
2208 forward: true,
2209 ..Default::default()
2210 },
2211 )
2212 .unwrap();
2213 assert_eq!(results.len(), 5);
2214 assert_eq!(results[0].1, "order#001"); let results = storage
2218 .query_items(
2219 "Orders",
2220 "user#1",
2221 &QueryParams {
2222 forward: true,
2223 limit: Some(2),
2224 ..Default::default()
2225 },
2226 )
2227 .unwrap();
2228 assert_eq!(results.len(), 2);
2229
2230 let results = storage
2232 .query_items(
2233 "Orders",
2234 "user#1",
2235 &QueryParams {
2236 forward: false,
2237 limit: Some(2),
2238 ..Default::default()
2239 },
2240 )
2241 .unwrap();
2242 assert_eq!(results.len(), 2);
2243 assert_eq!(results[0].1, "order#005"); }
2245
2246 #[test]
2247 fn test_scan_items() {
2248 let storage = test_storage();
2249 storage.create_data_table("ScanTest").unwrap();
2250
2251 storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2252 storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2253 storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2254
2255 let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2256 assert_eq!(results.len(), 3);
2257
2258 let results = storage
2260 .scan_items(
2261 "ScanTest",
2262 &ScanParams {
2263 limit: Some(2),
2264 ..Default::default()
2265 },
2266 )
2267 .unwrap();
2268 assert_eq!(results.len(), 2);
2269
2270 let results = storage
2272 .scan_items(
2273 "ScanTest",
2274 &ScanParams {
2275 limit: Some(2),
2276 exclusive_start_pk: Some("a"),
2277 exclusive_start_sk: Some("1"),
2278 ..Default::default()
2279 },
2280 )
2281 .unwrap();
2282 assert_eq!(results.len(), 2);
2283 assert_eq!(results[0].0, "b"); }
2285
2286 #[test]
2287 fn test_count_items() {
2288 let storage = test_storage();
2289 storage.create_data_table("CountTest").unwrap();
2290
2291 assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2292
2293 storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2294 storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2295
2296 assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2297 }
2298
2299 #[test]
2300 fn test_gsi_table_lifecycle() {
2301 let storage = test_storage();
2302 storage.create_gsi_table("Orders", "ByDate").unwrap();
2303
2304 let gsi_name = "Orders::gsi::ByDate";
2306 let sql = format!(
2307 "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2308 gsi_name.replace('"', "\"\"")
2309 );
2310 storage
2311 .conn()
2312 .execute(
2313 &sql,
2314 params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2315 )
2316 .unwrap();
2317
2318 storage.drop_gsi_table("Orders", "ByDate").unwrap();
2319 }
2320
2321 #[test]
2322 fn test_nonexistent_table_metadata() {
2323 let storage = test_storage();
2324 assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2325 assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2326 }
2327
2328 #[test]
2329 fn test_metadata_cache_hit() {
2330 let storage = test_storage();
2331 storage
2332 .insert_table_metadata(&CreateTableMetadata {
2333 table_name: "CacheTest",
2334 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2335 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2336 created_at: 1000000,
2337 ..Default::default()
2338 })
2339 .unwrap();
2340
2341 let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2343 assert_eq!(meta1.table_name, "CacheTest");
2344
2345 let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2347 assert_eq!(meta2.table_name, "CacheTest");
2348 assert_eq!(meta1.created_at, meta2.created_at);
2349
2350 assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2352 }
2353
2354 #[test]
2355 fn test_metadata_cache_invalidated_on_delete() {
2356 let storage = test_storage();
2357 storage
2358 .insert_table_metadata(&CreateTableMetadata {
2359 table_name: "DelCache",
2360 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2361 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2362 created_at: 1000000,
2363 ..Default::default()
2364 })
2365 .unwrap();
2366
2367 storage.get_table_metadata("DelCache").unwrap();
2369 assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2370
2371 storage.delete_table_metadata("DelCache").unwrap();
2373 assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2374 }
2375
2376 #[test]
2377 fn test_metadata_cache_invalidated_on_stream_enable() {
2378 let storage = test_storage();
2379 storage
2380 .insert_table_metadata(&CreateTableMetadata {
2381 table_name: "StreamCache",
2382 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2383 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2384 created_at: 1000000,
2385 ..Default::default()
2386 })
2387 .unwrap();
2388
2389 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2391 assert!(!meta.stream_enabled);
2392
2393 storage
2395 .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2396 .unwrap();
2397 assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2398
2399 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2401 assert!(meta.stream_enabled);
2402 }
2403
2404 #[test]
2405 fn test_metadata_cache_invalidated_on_ttl_update() {
2406 let storage = test_storage();
2407 storage
2408 .insert_table_metadata(&CreateTableMetadata {
2409 table_name: "TtlCache",
2410 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2411 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2412 created_at: 1000000,
2413 ..Default::default()
2414 })
2415 .unwrap();
2416
2417 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2419 assert!(!meta.ttl_enabled);
2420
2421 storage
2423 .update_ttl_config("TtlCache", Some("expires_at"), true)
2424 .unwrap();
2425 assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2426
2427 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2429 assert!(meta.ttl_enabled);
2430 assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2431 }
2432
2433 #[test]
2434 fn test_num_to_buffer_zero() {
2435 assert_eq!(num_to_buffer("0"), vec![0x80]);
2437 assert_eq!(num_to_buffer("-0"), vec![0x80]);
2438 }
2439
2440 #[test]
2441 fn test_hash_prefix_string_keys() {
2442 let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2445 let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2446 let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2447 let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2448
2449 assert_eq!(
2452 hash_bucket(&h1),
2453 0,
2454 "3635 should be bucket 0, got hash {h1}"
2455 );
2456 assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2457
2458 assert_eq!(
2460 hash_bucket(&h3),
2461 1,
2462 "1668 should be bucket 1, got hash {h3}"
2463 );
2464
2465 assert_eq!(
2467 hash_bucket(&h4),
2468 4,
2469 "3435 should be bucket 4, got hash {h4}"
2470 );
2471 }
2472
2473 #[test]
2474 fn test_hash_prefix_number_keys() {
2475 let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2478 assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2479
2480 let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2482 assert_eq!(
2483 hash_bucket(&h2),
2484 4095,
2485 "2388 should be bucket 4095, got hash {h2}"
2486 );
2487 }
2488
2489 #[test]
2490 fn test_hash_in_segment() {
2491 assert!(hash_in_segment("000000", 0, 4096));
2493 assert!(!hash_in_segment("000000", 1, 4096));
2494
2495 assert!(hash_in_segment("001000", 1, 4096));
2497 assert!(!hash_in_segment("001000", 0, 4096));
2498
2499 assert!(hash_in_segment("fff000", 4095, 4096));
2501 assert!(!hash_in_segment("fff000", 0, 4096));
2502
2503 assert!(hash_in_segment("000000", 0, 2));
2505 assert!(hash_in_segment("7ff000", 0, 2));
2506 assert!(hash_in_segment("800000", 1, 2));
2507 assert!(hash_in_segment("fff000", 1, 2));
2508 }
2509}