1#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
2use crate::errors::{DynoxideError, Result};
3#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
4use crate::storage_backend::clock::{Clock, SystemClock};
5#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
6use crate::storage_backend::sql_builders::{self, escape_table_name};
7use crate::types::AttributeValue;
8#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
9use rusqlite::{Connection, params};
10#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
11use std::{cell::RefCell, collections::HashMap, sync::Arc};
12
13#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
15const SCHEMA_VERSION: &str = "8";
16
17pub(crate) const HASH_BUCKETS: u32 = 4096;
20
21pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
35 let key_bytes = match pk_value {
36 AttributeValue::S(s) => s.as_bytes().to_vec(),
37 AttributeValue::N(n) => num_to_buffer(n),
38 AttributeValue::B(b) => b.clone(),
39 _ => vec![], };
41
42 let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
43 format!("{:032x}", digest)[..6].to_string()
44}
45
46pub fn hash_bucket(hash_prefix: &str) -> u32 {
48 let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
49 u32::from_str_radix(prefix_3, 16).unwrap_or(0)
50}
51
52fn num_to_buffer(num_str: &str) -> Vec<u8> {
58 let trimmed = num_str.trim();
59 if trimmed.is_empty() {
60 return vec![0x80];
61 }
62
63 use bigdecimal::BigDecimal;
64 use std::str::FromStr;
65
66 let bd = match BigDecimal::from_str(trimmed) {
67 Ok(v) => v,
68 Err(_) => return vec![0x80],
69 };
70
71 if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
72 return vec![0x80];
73 }
74
75 let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
76 let bd_abs = if is_negative { -&bd } else { bd.clone() };
77
78 let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
79 if mantissa.is_empty() {
80 return vec![0x80];
81 }
82
83 let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
86 let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
87
88 let mut byte_array: Vec<u8>;
89 if byte_len_no_exp < 20 && is_negative {
90 byte_array = vec![0u8; byte_len_no_exp + 2];
91 byte_array[byte_len_no_exp + 1] = 102;
92 } else {
93 byte_array = vec![0u8; byte_len_no_exp + 1];
94 }
95
96 let exp_sum = exponent + append_zero;
100 let exp_byte_val = floor_div(exp_sum, 2) - 64;
101 if is_negative {
102 byte_array[0] = (exp_byte_val ^ !0i64) as u8;
105 } else {
106 byte_array[0] = exp_byte_val as u8;
107 }
108
109 let mut mi: i64 = 0; let mlen = mantissa.len() as i64;
113 let mut appended_zero = false;
114
115 while mi < mlen {
116 let bai = ((mi + append_zero) / 2 + 1) as usize; if append_zero != 0 && mi == 0 && !appended_zero {
118 byte_array[bai] = 0;
119 appended_zero = true;
120 mi -= 1; } else if (mi + append_zero) % 2 == 0 {
122 byte_array[bai] = mantissa[mi as usize] * 10;
123 } else {
124 byte_array[bai] += mantissa[mi as usize];
125 }
126
127 if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
129 if is_negative {
130 byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
131 } else {
132 byte_array[bai] = byte_array[bai].wrapping_add(1);
133 }
134 }
135
136 mi += 1; }
138
139 byte_array
140}
141
142fn floor_div(a: i64, b: i64) -> i64 {
144 let d = a / b;
145 let r = a % b;
146 if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
147}
148
149fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
156 let normalized = bd.normalized();
158
159 let (bigint, scale) = normalized.as_bigint_and_exponent();
162 let digits_str = bigint.to_string();
163 let digits_str = digits_str.trim_start_matches('-');
164
165 let digits: Vec<u8> = digits_str
166 .chars()
167 .map(|c| c.to_digit(10).unwrap() as u8)
168 .collect();
169
170 let exponent = digits.len() as i64 - scale;
173
174 (digits, exponent)
175}
176
177pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
184 let bucket = hash_bucket(hash_prefix);
185 let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
186 let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
187 bucket >= start && bucket <= end
188}
189
190pub(crate) fn ceiling_div(a: u32, b: u32) -> u32 {
191 a.div_ceil(b)
192}
193
194#[derive(Debug, Default)]
196pub struct ScanParams<'a> {
197 pub limit: Option<usize>,
198 pub exclusive_start_pk: Option<&'a str>,
199 pub exclusive_start_sk: Option<&'a str>,
200 pub segment: Option<u32>,
201 pub total_segments: Option<u32>,
202 pub exclusive_start_base_pk: Option<&'a str>,
204 pub exclusive_start_base_sk: Option<&'a str>,
206}
207
208#[derive(Debug, Default)]
210pub struct CreateTableMetadata<'a> {
211 pub table_name: &'a str,
212 pub key_schema: &'a str,
213 pub attribute_definitions: &'a str,
214 pub gsi_definitions: Option<&'a str>,
215 pub lsi_definitions: Option<&'a str>,
216 pub provisioned_throughput: Option<&'a str>,
217 pub created_at: i64,
218 pub sse_specification: Option<&'a str>,
219 pub table_class: Option<&'a str>,
220 pub deletion_protection_enabled: bool,
221 pub billing_mode: Option<&'a str>,
222 pub on_demand_throughput: Option<&'a str>,
223}
224
225#[derive(Debug, Default)]
227pub struct QueryParams<'a> {
228 pub sk_condition: Option<&'a str>,
229 pub sk_params: &'a [&'a str],
230 pub forward: bool,
231 pub limit: Option<usize>,
232 pub exclusive_start_sk: Option<&'a str>,
233 pub exclusive_start_base_pk: Option<&'a str>,
235 pub exclusive_start_base_sk: Option<&'a str>,
237}
238
239#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
247pub struct Storage {
248 conn: Connection,
249 metadata_cache: RefCell<HashMap<String, TableMetadata>>,
252 clock: Arc<dyn Clock>,
254}
255
256#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
257impl Storage {
258 pub fn new(path: &str) -> Result<Self> {
260 let conn = Connection::open(path)?;
261 let mut storage = Self {
262 conn,
263 metadata_cache: RefCell::new(HashMap::new()),
264 clock: Arc::new(SystemClock),
265 };
266 storage.initialize().map_err(Self::maybe_encrypted_error)?;
267 Ok(storage)
268 }
269
270 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
276 self.clock = clock;
277 self
278 }
279
280 pub(crate) fn clock(&self) -> &dyn Clock {
282 self.clock.as_ref()
283 }
284
285 fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
288 if let DynoxideError::SqliteError(ref sqlite_err) = err {
289 if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
290 return DynoxideError::InternalServerError(
291 "Database file is encrypted or not a valid SQLite database. \
292 If encrypted, enable the `encryption` or `encryption-cc` feature \
293 and use Database::new_encrypted() with the correct key."
294 .to_string(),
295 );
296 }
297 }
298 err
299 }
300
301 #[cfg(feature = "_has-encryption")]
307 pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
308 use zeroize::Zeroize;
309
310 let conn = Connection::open(path)?;
311 let mut pragma_val = format!("x'{key}'");
316 conn.pragma_update(None, "key", &pragma_val)?;
317 pragma_val.zeroize();
318 conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
320 let mut storage = Self {
321 conn,
322 metadata_cache: RefCell::new(HashMap::new()),
323 clock: Arc::new(SystemClock),
324 };
325 storage.initialize()?;
326 Ok(storage)
327 }
328
329 pub fn memory() -> Result<Self> {
331 let conn = Connection::open_in_memory()?;
332 let mut storage = Self {
333 conn,
334 metadata_cache: RefCell::new(HashMap::new()),
335 clock: Arc::new(SystemClock),
336 };
337 storage.initialize()?;
338 Ok(storage)
339 }
340
341 fn initialize(&mut self) -> Result<()> {
343 self.conn.pragma_update(None, "journal_mode", "WAL")?;
345
346 self.conn.create_scalar_function(
349 "fnv1a_hash",
350 1,
351 rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
352 | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
353 |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
354 let pk_ref = ctx.get_raw(0);
355 let pk_bytes = match pk_ref {
356 rusqlite::types::ValueRef::Text(bytes) => bytes,
357 _ => {
358 return Err(rusqlite::Error::InvalidFunctionParameterType(
359 0,
360 rusqlite::types::Type::Text,
361 ));
362 }
363 };
364 let mut hash: u32 = 2166136261;
365 for &byte in pk_bytes {
366 hash ^= byte as u32;
367 hash = hash.wrapping_mul(16777619);
368 }
369 Ok(hash as i64)
370 },
371 )?;
372
373 self.conn.execute_batch(sql_builders::INIT_SCHEMA)?;
375
376 let _ = self
378 .conn
379 .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
380
381 self.conn.execute(
383 "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
384 params![SCHEMA_VERSION],
385 )?;
386
387 let version: i32 = self
389 .conn
390 .query_row(
391 "SELECT value FROM _config WHERE key = 'schema_version'",
392 [],
393 |r| r.get::<_, String>(0),
394 )
395 .unwrap_or_else(|_| "1".to_string())
396 .parse()
397 .unwrap_or(1);
398
399 if version < 2 {
400 self.migrate_v1_to_v2()?;
401 }
402 if version < 3 {
403 self.migrate_v2_to_v3()?;
404 }
405 if version < 4 {
406 self.migrate_v3_to_v4()?;
407 }
408 if version < 5 {
409 self.migrate_v4_to_v5()?;
410 }
411 if version < 6 {
412 self.migrate_v5_to_v6()?;
413 }
414 if version < 7 {
415 self.migrate_v6_to_v7()?;
416 }
417 if version < 8 {
418 self.migrate_v7_to_v8()?;
419 }
420
421 Ok(())
422 }
423
424 fn migrate_v1_to_v2(&self) -> Result<()> {
426 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427 let table_names: Vec<String> = stmt
428 .query_map([], |row| row.get(0))?
429 .collect::<std::result::Result<Vec<_>, _>>()?;
430
431 for table_name in &table_names {
432 let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433 let _ = self.conn.execute(
434 &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435 [],
436 );
437 }
438
439 self.conn.execute(
440 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441 [],
442 )?;
443
444 Ok(())
445 }
446
447 fn migrate_v2_to_v3(&self) -> Result<()> {
449 let _ = self
450 .conn
451 .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453 self.conn.execute(
454 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455 [],
456 )?;
457
458 Ok(())
459 }
460
461 fn migrate_v3_to_v4(&self) -> Result<()> {
463 let _ = self
464 .conn
465 .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466 let _ = self
467 .conn
468 .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469 let _ = self.conn.execute(
470 "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471 [],
472 );
473
474 self.conn.execute(
475 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476 [],
477 )?;
478
479 Ok(())
480 }
481
482 fn migrate_v4_to_v5(&self) -> Result<()> {
484 let mut stmt = self
485 .conn
486 .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487 let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489 .collect::<std::result::Result<Vec<_>, _>>()?;
490
491 for (table_name, gsi_json, lsi_json) in &tables {
492 if let Some(json) = gsi_json {
493 if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494 for gsi in &gsis {
495 if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496 let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497 let idx_name =
498 escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499 let _ = self.conn.execute_batch(&format!(
500 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501 ));
502 }
503 }
504 }
505 }
506 if let Some(json) = lsi_json {
507 if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508 for lsi in &lsis {
509 if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510 let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511 let idx_name =
512 escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513 let _ = self.conn.execute_batch(&format!(
514 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515 ));
516 }
517 }
518 }
519 }
520 }
521
522 self.conn.execute(
523 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524 [],
525 )?;
526
527 Ok(())
528 }
529
530 fn migrate_v5_to_v6(&self) -> Result<()> {
532 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533 let table_names: Vec<String> = stmt
534 .query_map([], |row| row.get(0))?
535 .collect::<std::result::Result<Vec<_>, _>>()?;
536
537 for table_name in &table_names {
538 let escaped = escape_table_name(table_name);
539 let _ = self.conn.execute(
540 &format!(
541 "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542 ),
543 [],
544 );
545 }
546
547 self.conn.execute(
548 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549 [],
550 )?;
551
552 Ok(())
553 }
554
555 fn migrate_v6_to_v7(&self) -> Result<()> {
557 let _ = self.conn.execute(
558 "ALTER TABLE _tables ADD COLUMN on_demand_throughput TEXT",
559 [],
560 );
561
562 self.conn.execute(
563 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '7')",
564 [],
565 )?;
566
567 Ok(())
568 }
569
570 fn migrate_v7_to_v8(&self) -> Result<()> {
581 let _ = self
582 .conn
583 .execute("ALTER TABLE _tables ADD COLUMN table_id TEXT", []);
584
585 let names: Vec<String> = {
586 let mut stmt = self
587 .conn
588 .prepare("SELECT table_name FROM _tables WHERE table_id IS NULL")?;
589 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
590 rows.collect::<rusqlite::Result<Vec<String>>>()?
591 };
592 for name in names {
593 let id = uuid::Uuid::new_v4().to_string();
594 self.conn.execute(
595 "UPDATE _tables SET table_id = ?1 WHERE table_name = ?2",
596 params![id, name],
597 )?;
598 }
599
600 self.conn.execute(
601 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '8')",
602 [],
603 )?;
604
605 Ok(())
606 }
607
608 pub fn conn(&self) -> &Connection {
610 &self.conn
611 }
612
613 pub fn conn_mut(&mut self) -> &mut Connection {
615 &mut self.conn
616 }
617
618 pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
624 let table_name = m.table_name;
625 let (sql, params) = sql_builders::insert_table_metadata(m);
626 self.conn
627 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
628 self.metadata_cache.borrow_mut().remove(table_name);
629 Ok(())
630 }
631
632 pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
638 if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
640 return Ok(Some(cached.clone()));
641 }
642
643 let (sql, params) = sql_builders::get_table_metadata(table_name);
644 let mut stmt = self.conn.prepare(&sql)?;
645
646 let result = stmt.query_row(rusqlite::params_from_iter(params.iter()), row_to_metadata);
647
648 match result {
649 Ok(meta) => {
650 self.metadata_cache
651 .borrow_mut()
652 .insert(table_name.to_string(), meta.clone());
653 Ok(Some(meta))
654 }
655 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
656 Err(e) => Err(DynoxideError::from(e)),
657 }
658 }
659
660 pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
662 let (sql, params) = sql_builders::delete_table_metadata(table_name);
663 let affected = self
664 .conn
665 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
666 self.metadata_cache.borrow_mut().remove(table_name);
667 Ok(affected > 0)
668 }
669
670 pub fn update_table_metadata(
672 &self,
673 table_name: &str,
674 attribute_definitions: &str,
675 gsi_definitions: Option<&str>,
676 ) -> Result<()> {
677 let (sql, params) =
678 sql_builders::update_table_metadata(table_name, attribute_definitions, gsi_definitions);
679 self.conn
680 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
681 self.metadata_cache.borrow_mut().remove(table_name);
682 Ok(())
683 }
684
685 pub fn update_provisioned_throughput(
687 &self,
688 table_name: &str,
689 provisioned_throughput: &str,
690 ) -> Result<()> {
691 let (sql, params) =
692 sql_builders::update_provisioned_throughput(table_name, provisioned_throughput);
693 self.conn
694 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
695 self.metadata_cache.borrow_mut().remove(table_name);
696 Ok(())
697 }
698
699 pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
701 let (sql, params) = sql_builders::clear_provisioned_throughput(table_name);
702 self.conn
703 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
704 self.metadata_cache.borrow_mut().remove(table_name);
705 Ok(())
706 }
707
708 pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
710 let (sql, params) = sql_builders::update_billing_mode(table_name, billing_mode);
711 self.conn
712 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
713 self.metadata_cache.borrow_mut().remove(table_name);
714 Ok(())
715 }
716
717 pub fn update_table_class(&self, table_name: &str, table_class: &str) -> Result<()> {
719 let (sql, params) = sql_builders::update_table_class(table_name, table_class);
720 self.conn
721 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
722 self.metadata_cache.borrow_mut().remove(table_name);
723 Ok(())
724 }
725
726 pub fn update_on_demand_throughput(
728 &self,
729 table_name: &str,
730 on_demand_throughput: &str,
731 ) -> Result<()> {
732 let (sql, params) =
733 sql_builders::update_on_demand_throughput(table_name, on_demand_throughput);
734 self.conn
735 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
736 self.metadata_cache.borrow_mut().remove(table_name);
737 Ok(())
738 }
739
740 pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
746 let tags_json: Option<String> = self.conn.query_row(
747 "SELECT tags FROM _tables WHERE table_name = ?1",
748 params![table_name],
749 |row| row.get(0),
750 )?;
751
752 match tags_json {
753 Some(json) => serde_json::from_str(&json)
754 .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
755 None => Ok(Vec::new()),
756 }
757 }
758
759 pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
761 use std::collections::BTreeMap;
762
763 let existing = self.get_tags(table_name)?;
764 let mut tag_map: BTreeMap<String, String> =
765 existing.into_iter().map(|t| (t.key, t.value)).collect();
766
767 for tag in new_tags {
768 tag_map.insert(tag.key.clone(), tag.value.clone());
769 }
770
771 if tag_map.len() > 50 {
772 return Err(DynoxideError::ValidationException(
773 "One or more parameter values were invalid: \
774 Too many tags: tag limit is 50"
775 .to_string(),
776 ));
777 }
778
779 let merged: Vec<crate::types::Tag> = tag_map
780 .into_iter()
781 .map(|(k, v)| crate::types::Tag { key: k, value: v })
782 .collect();
783
784 let json = serde_json::to_string(&merged)
785 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
786
787 self.conn.execute(
788 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
789 params![json, table_name],
790 )?;
791 Ok(())
792 }
793
794 pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
796 let (sql, params) = sql_builders::update_deletion_protection(table_name, enabled);
797 self.conn
798 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
799 self.metadata_cache.borrow_mut().remove(table_name);
800 Ok(())
801 }
802
803 pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
805 let mut tags = self.get_tags(table_name)?;
806 tags.retain(|t| !keys.contains(&t.key));
807
808 let json = if tags.is_empty() {
809 None
810 } else {
811 Some(
812 serde_json::to_string(&tags)
813 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
814 )
815 };
816
817 self.conn.execute(
818 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
819 params![json, table_name],
820 )?;
821 Ok(())
822 }
823
824 pub fn list_table_names(&self) -> Result<Vec<String>> {
826 let (sql, params) = sql_builders::list_table_names();
827 let mut stmt = self.conn.prepare(&sql)?;
828 let names = stmt
829 .query_map(rusqlite::params_from_iter(params.iter()), |row| row.get(0))?
830 .collect::<std::result::Result<Vec<String>, _>>()?;
831 Ok(names)
832 }
833
834 pub fn table_exists(&self, table_name: &str) -> Result<bool> {
836 let (sql, params) = sql_builders::table_exists(table_name);
837 let count: i32 =
838 self.conn
839 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
840 row.get(0)
841 })?;
842 Ok(count > 0)
843 }
844
845 #[allow(dead_code)]
847 pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
848 self.metadata_cache.borrow_mut().remove(table_name);
849 }
850
851 pub fn create_data_table(&self, table_name: &str) -> Result<()> {
857 let (sql, params) = sql_builders::create_data_table(table_name);
858 self.conn
859 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
860 Ok(())
861 }
862
863 pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
865 let (sql, params) = sql_builders::drop_data_table(table_name);
866 self.conn
867 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
868 Ok(())
869 }
870
871 pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
873 let (sql, _) = sql_builders::create_gsi_table(table_name, index_name);
874 self.conn.execute_batch(&sql)?;
875 Ok(())
876 }
877
878 pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
880 let (sql, params) = sql_builders::drop_gsi_table(table_name, index_name);
881 self.conn
882 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
883 Ok(())
884 }
885
886 #[allow(clippy::too_many_arguments)]
892 pub fn insert_gsi_item(
893 &self,
894 table_name: &str,
895 index_name: &str,
896 gsi_pk: &str,
897 gsi_sk: &str,
898 table_pk: &str,
899 table_sk: &str,
900 item_json: &str,
901 ) -> Result<()> {
902 let sql = sql_builders::gsi_insert_sql(table_name, index_name);
903 let params = sql_builders::gsi_insert_params(gsi_pk, gsi_sk, table_pk, table_sk, item_json);
904 self.conn
905 .prepare_cached(&sql)?
906 .execute(rusqlite::params_from_iter(params.iter()))?;
907 Ok(())
908 }
909
910 pub fn insert_gsi_items(
914 &self,
915 table_name: &str,
916 index_name: &str,
917 rows: &[crate::storage_backend::GsiItemRow],
918 ) -> Result<()> {
919 let sql = sql_builders::gsi_insert_sql(table_name, index_name);
920 let mut stmt = self.conn.prepare_cached(&sql)?;
921 for row in rows {
922 let params = sql_builders::gsi_insert_params(
923 &row.gsi_pk,
924 &row.gsi_sk,
925 &row.table_pk,
926 &row.table_sk,
927 &row.item_json,
928 );
929 stmt.execute(rusqlite::params_from_iter(params.iter()))?;
930 }
931 Ok(())
932 }
933
934 pub fn delete_gsi_item(
936 &self,
937 table_name: &str,
938 index_name: &str,
939 table_pk: &str,
940 table_sk: &str,
941 ) -> Result<()> {
942 let (sql, params) =
943 sql_builders::delete_gsi_item(table_name, index_name, table_pk, table_sk);
944 self.conn
945 .prepare_cached(&sql)?
946 .execute(rusqlite::params_from_iter(params.iter()))?;
947 Ok(())
948 }
949
950 pub fn query_gsi_items(
952 &self,
953 table_name: &str,
954 index_name: &str,
955 gsi_pk: &str,
956 params: &QueryParams,
957 ) -> Result<Vec<(String, String, String)>> {
958 let (sql, params_vec) =
959 sql_builders::query_gsi_items(table_name, index_name, gsi_pk, params);
960 let mut stmt = self.conn.prepare(&sql)?;
961 let rows = stmt
962 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
963 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
964 })?
965 .collect::<std::result::Result<Vec<_>, _>>()?;
966
967 Ok(rows)
968 }
969
970 pub fn scan_gsi_items(
972 &self,
973 table_name: &str,
974 index_name: &str,
975 params: &ScanParams,
976 ) -> Result<Vec<(String, String, String)>> {
977 let (sql, params_vec) = sql_builders::scan_gsi_items(table_name, index_name, params);
978 let mut stmt = self.conn.prepare(&sql)?;
979 let rows = stmt
980 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
981 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
982 })?
983 .collect::<std::result::Result<Vec<_>, _>>()?;
984
985 Ok(rows)
986 }
987
988 pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
994 let (sql, _) = sql_builders::create_lsi_table(table_name, index_name);
995 self.conn.execute_batch(&sql)?;
996 Ok(())
997 }
998
999 pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1001 let (sql, params) = sql_builders::drop_lsi_table(table_name, index_name);
1002 self.conn
1003 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1004 Ok(())
1005 }
1006
1007 #[allow(clippy::too_many_arguments)]
1013 pub fn insert_lsi_item(
1014 &self,
1015 table_name: &str,
1016 index_name: &str,
1017 pk: &str,
1018 sk: &str,
1019 base_pk: &str,
1020 base_sk: &str,
1021 item_json: &str,
1022 ) -> Result<()> {
1023 let sql = sql_builders::lsi_insert_sql(table_name, index_name);
1024 let params = sql_builders::lsi_insert_params(pk, sk, base_pk, base_sk, item_json);
1025 self.conn
1026 .prepare_cached(&sql)?
1027 .execute(rusqlite::params_from_iter(params.iter()))?;
1028 Ok(())
1029 }
1030
1031 pub fn delete_lsi_item(
1033 &self,
1034 table_name: &str,
1035 index_name: &str,
1036 base_pk: &str,
1037 base_sk: &str,
1038 ) -> Result<()> {
1039 let (sql, params) = sql_builders::delete_lsi_item(table_name, index_name, base_pk, base_sk);
1040 self.conn
1041 .prepare_cached(&sql)?
1042 .execute(rusqlite::params_from_iter(params.iter()))?;
1043 Ok(())
1044 }
1045
1046 pub fn query_lsi_items(
1048 &self,
1049 table_name: &str,
1050 index_name: &str,
1051 pk: &str,
1052 params: &QueryParams,
1053 ) -> Result<Vec<(String, String, String)>> {
1054 let (sql, params_vec) = sql_builders::query_lsi_items(table_name, index_name, pk, params);
1055 let mut stmt = self.conn.prepare(&sql)?;
1056 let rows = stmt
1057 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1058 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1059 })?
1060 .collect::<std::result::Result<Vec<_>, _>>()?;
1061
1062 Ok(rows)
1063 }
1064
1065 pub fn scan_lsi_items(
1067 &self,
1068 table_name: &str,
1069 index_name: &str,
1070 params: &ScanParams,
1071 ) -> Result<Vec<(String, String, String)>> {
1072 let (sql, params_vec) = sql_builders::scan_lsi_items(table_name, index_name, params);
1073 let mut stmt = self.conn.prepare(&sql)?;
1074 let rows = stmt
1075 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1076 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1077 })?
1078 .collect::<std::result::Result<Vec<_>, _>>()?;
1079
1080 Ok(rows)
1081 }
1082
1083 pub fn begin_transaction(&self) -> Result<()> {
1089 self.conn.execute_batch(sql_builders::BEGIN)?;
1090 Ok(())
1091 }
1092
1093 pub fn commit(&self) -> Result<()> {
1095 self.conn.execute_batch(sql_builders::COMMIT)?;
1096 Ok(())
1097 }
1098
1099 pub fn rollback(&self) -> Result<()> {
1101 self.conn.execute_batch(sql_builders::ROLLBACK)?;
1102 Ok(())
1103 }
1104
1105 pub fn enable_bulk_loading(&self) -> Result<()> {
1111 self.conn.execute_batch(
1112 "PRAGMA synchronous = OFF;
1113 PRAGMA cache_size = -64000;
1114 PRAGMA temp_store = MEMORY;
1115 PRAGMA mmap_size = 268435456;",
1116 )?;
1117 Ok(())
1118 }
1119
1120 pub fn disable_bulk_loading(&self) -> Result<()> {
1122 self.conn.execute_batch(
1123 "PRAGMA synchronous = NORMAL;
1124 PRAGMA cache_size = -2000;
1125 PRAGMA temp_store = DEFAULT;
1126 PRAGMA mmap_size = 0;",
1127 )?;
1128 Ok(())
1129 }
1130
1131 pub fn put_item(
1137 &self,
1138 table_name: &str,
1139 pk: &str,
1140 sk: &str,
1141 item_json: &str,
1142 item_size: usize,
1143 ) -> Result<Option<String>> {
1144 self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1145 }
1146
1147 pub fn put_item_with_hash(
1149 &self,
1150 table_name: &str,
1151 pk: &str,
1152 sk: &str,
1153 item_json: &str,
1154 item_size: usize,
1155 hash_prefix: &str,
1156 ) -> Result<Option<String>> {
1157 let old_item = self.get_item(table_name, pk, sk)?;
1159
1160 let (sql, params) =
1161 sql_builders::put_item_with_hash(table_name, pk, sk, item_json, item_size, hash_prefix);
1162 self.conn
1163 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1164
1165 Ok(old_item)
1166 }
1167
1168 pub fn put_base_items(
1173 &self,
1174 table_name: &str,
1175 rows: &[crate::storage_backend::BaseItemRow],
1176 ) -> Result<()> {
1177 let escaped = escape_table_name(table_name);
1178 let sql = format!(
1179 "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1180 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"
1181 );
1182 let mut stmt = self.conn.prepare_cached(&sql)?;
1183 for row in rows {
1184 stmt.execute(params![
1185 row.pk,
1186 row.sk,
1187 row.item_json,
1188 row.item_size as i64,
1189 row.cached_at,
1190 row.hash_prefix
1191 ])?;
1192 }
1193 Ok(())
1194 }
1195
1196 pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1198 let (sql, params) = sql_builders::get_item(table_name, pk, sk);
1199 let result = self
1200 .conn
1201 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1202 row.get(0)
1203 });
1204
1205 match result {
1206 Ok(json) => Ok(Some(json)),
1207 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1208 Err(e) => Err(DynoxideError::from(e)),
1209 }
1210 }
1211
1212 pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1214 let (sql, params) = sql_builders::get_partition_size(table_name, pk);
1215 let size: i64 =
1216 self.conn
1217 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1218 row.get(0)
1219 })?;
1220 Ok(size)
1221 }
1222
1223 pub fn get_lsi_partition_size(
1226 &self,
1227 table_name: &str,
1228 index_name: &str,
1229 pk: &str,
1230 ) -> Result<i64> {
1231 let (sql, params) = sql_builders::get_lsi_partition_size(table_name, index_name, pk);
1232 let size: i64 =
1233 self.conn
1234 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1235 row.get(0)
1236 })?;
1237 Ok(size)
1238 }
1239
1240 pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1242 let old_item = self.get_item(table_name, pk, sk)?;
1243
1244 let (sql, params) = sql_builders::delete_item(table_name, pk, sk);
1245 self.conn
1246 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1247
1248 Ok(old_item)
1249 }
1250
1251 pub fn query_items(
1256 &self,
1257 table_name: &str,
1258 pk: &str,
1259 params: &QueryParams,
1260 ) -> Result<Vec<(String, String, String)>> {
1261 let (sql, params_vec) = sql_builders::query_items(table_name, pk, params);
1262 let mut stmt = self.conn.prepare(&sql)?;
1263 let rows = stmt
1264 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1265 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1266 })?
1267 .collect::<std::result::Result<Vec<_>, _>>()?;
1268
1269 Ok(rows)
1270 }
1271
1272 pub fn scan_items(
1277 &self,
1278 table_name: &str,
1279 params: &ScanParams,
1280 ) -> Result<Vec<(String, String, String)>> {
1281 let (sql, params_vec) = sql_builders::scan_items(table_name, params);
1282 let mut stmt = self.conn.prepare(&sql)?;
1283 let rows = stmt
1284 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1285 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1286 })?
1287 .collect::<std::result::Result<Vec<_>, _>>()?;
1288
1289 Ok(rows)
1290 }
1291
1292 pub fn count_items(&self, table_name: &str) -> Result<i64> {
1294 let (sql, params) = sql_builders::count_items(table_name);
1295 let count: i64 =
1296 self.conn
1297 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1298 row.get(0)
1299 })?;
1300 Ok(count)
1301 }
1302
1303 pub fn db_path(&self) -> Option<String> {
1309 self.conn
1310 .path()
1311 .filter(|p| !p.is_empty())
1312 .map(|p| p.to_owned())
1313 }
1314
1315 pub fn db_size_bytes(&self) -> Result<u64> {
1317 let size: i64 = self.conn.query_row(
1318 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1319 [],
1320 |row| row.get(0),
1321 )?;
1322 Ok(size as u64)
1323 }
1324
1325 pub fn table_count(&self) -> Result<usize> {
1327 let count: i64 = self
1328 .conn
1329 .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1330 Ok(count as usize)
1331 }
1332
1333 pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1337 let table_names = self.list_table_names()?;
1338 let mut stats = Vec::with_capacity(table_names.len());
1339 for name in table_names {
1340 let sql = format!(
1341 "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1342 escape_table_name(&name)
1343 );
1344 let (item_count, size_bytes): (i64, i64) = self
1345 .conn
1346 .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1347 stats.push(TableStats {
1348 table_name: name,
1349 item_count,
1350 size_bytes: size_bytes as u64,
1351 });
1352 }
1353 Ok(stats)
1354 }
1355
1356 pub fn database_info(&self) -> Result<DatabaseInfo> {
1361 let path = self.db_path();
1362 let size_bytes = self.db_size_bytes()?;
1363 let table_count = self.table_count()?;
1364 let stats = self.table_stats()?;
1365
1366 let mut table_details = Vec::with_capacity(stats.len());
1367 for s in stats {
1368 let metadata = self.get_table_metadata(&s.table_name)?;
1369 table_details.push(TableInfoEntry { stats: s, metadata });
1370 }
1371
1372 Ok(DatabaseInfo {
1373 path,
1374 size_bytes,
1375 table_count,
1376 tables: table_details,
1377 })
1378 }
1379
1380 pub fn vacuum_into(&self, path: &str) -> Result<()> {
1387 if path.contains('\0') {
1388 return Err(DynoxideError::ValidationException(
1389 "path contains null byte".to_string(),
1390 ));
1391 }
1392 self.conn
1393 .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1394 Ok(())
1395 }
1396
1397 pub fn vacuum(&self) -> Result<()> {
1399 self.conn.execute_batch("VACUUM")?;
1400 Ok(())
1401 }
1402
1403 pub fn restore_from(&mut self, path: &str) -> Result<()> {
1407 let source = Connection::open(path)?;
1408 self.restore_from_connection(&source)
1409 }
1410
1411 pub fn backup_to_memory(&self) -> Result<Connection> {
1416 let mut dest = Connection::open_in_memory()?;
1417 {
1418 let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1419 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1420 }
1421 Ok(dest)
1422 }
1423
1424 pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1429 let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1430 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1431 self.metadata_cache.borrow_mut().clear();
1432 Ok(())
1433 }
1434
1435 pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1437 let size: i64 = conn.query_row(
1438 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1439 [],
1440 |row| row.get(0),
1441 )?;
1442 Ok(size as u64)
1443 }
1444
1445 pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1451 self.conn.execute(
1452 "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1453 params![view_type, label, table_name],
1454 )?;
1455 self.metadata_cache.borrow_mut().remove(table_name);
1456 Ok(())
1457 }
1458
1459 pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1461 self.conn.execute(
1462 "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1463 params![table_name],
1464 )?;
1465 self.metadata_cache.borrow_mut().remove(table_name);
1466 Ok(())
1467 }
1468
1469 #[allow(clippy::too_many_arguments)]
1471 pub fn insert_stream_record(
1472 &self,
1473 table_name: &str,
1474 event_name: &str,
1475 keys_json: &str,
1476 new_image: Option<&str>,
1477 old_image: Option<&str>,
1478 sequence_number: &str,
1479 shard_id: &str,
1480 created_at: i64,
1481 ) -> Result<()> {
1482 self.insert_stream_record_with_identity(
1483 table_name,
1484 event_name,
1485 keys_json,
1486 new_image,
1487 old_image,
1488 sequence_number,
1489 shard_id,
1490 created_at,
1491 None,
1492 )
1493 }
1494
1495 #[allow(clippy::too_many_arguments)]
1497 pub fn insert_stream_record_with_identity(
1498 &self,
1499 table_name: &str,
1500 event_name: &str,
1501 keys_json: &str,
1502 new_image: Option<&str>,
1503 old_image: Option<&str>,
1504 sequence_number: &str,
1505 shard_id: &str,
1506 created_at: i64,
1507 user_identity: Option<&str>,
1508 ) -> Result<()> {
1509 self.conn.execute(
1510 "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1511 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1512 params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1513 )?;
1514 Ok(())
1515 }
1516
1517 pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1519 let result: std::result::Result<i64, _> = self.conn.query_row(
1520 "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1521 params![table_name],
1522 |row| row.get(0),
1523 );
1524 match result {
1525 Ok(n) => Ok(n),
1526 Err(_) => Ok(1),
1527 }
1528 }
1529
1530 pub fn get_stream_records(
1532 &self,
1533 table_name: &str,
1534 shard_id: &str,
1535 after_sequence: i64,
1536 limit: usize,
1537 ) -> Result<Vec<StreamRecord>> {
1538 let mut stmt = self.conn.prepare(
1539 "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1540 FROM _stream_records
1541 WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1542 ORDER BY CAST(sequence_number AS INTEGER) ASC
1543 LIMIT ?4",
1544 )?;
1545 let rows = stmt
1546 .query_map(
1547 params![table_name, shard_id, after_sequence, limit as i64],
1548 |row| {
1549 Ok(StreamRecord {
1550 event_name: row.get(0)?,
1551 keys_json: row.get(1)?,
1552 new_image: row.get(2)?,
1553 old_image: row.get(3)?,
1554 sequence_number: row.get(4)?,
1555 created_at: row.get(5)?,
1556 user_identity: row.get(6)?,
1557 })
1558 },
1559 )?
1560 .collect::<std::result::Result<Vec<_>, _>>()?;
1561 Ok(rows)
1562 }
1563
1564 pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1566 let sql = format!(
1567 "SELECT {} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name",
1568 sql_builders::TABLE_METADATA_COLUMNS
1569 );
1570 let mut stmt = self.conn.prepare(&sql)?;
1571 let rows = stmt
1572 .query_map([], row_to_metadata)?
1573 .collect::<std::result::Result<Vec<_>, _>>()?;
1574 Ok(rows)
1575 }
1576
1577 pub fn update_ttl_config(
1583 &self,
1584 table_name: &str,
1585 attribute_name: Option<&str>,
1586 enabled: bool,
1587 ) -> Result<()> {
1588 self.conn.execute(
1589 "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1590 params![attribute_name, enabled as i32, table_name],
1591 )?;
1592 self.metadata_cache.borrow_mut().remove(table_name);
1593 Ok(())
1594 }
1595
1596 pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1598 let sql = format!(
1599 "SELECT {} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name",
1600 sql_builders::TABLE_METADATA_COLUMNS
1601 );
1602 let mut stmt = self.conn.prepare(&sql)?;
1603 let rows = stmt
1604 .query_map([], row_to_metadata)?
1605 .collect::<std::result::Result<Vec<_>, _>>()?;
1606 Ok(rows)
1607 }
1608
1609 pub fn get_shard_sequence_range(
1611 &self,
1612 table_name: &str,
1613 shard_id: &str,
1614 ) -> Result<(Option<String>, Option<String>)> {
1615 let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1616 "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1617 params![table_name, shard_id],
1618 |row| Ok((row.get(0)?, row.get(1)?)),
1619 );
1620 match result {
1621 Ok(range) => Ok(range),
1622 Err(_) => Ok((None, None)),
1623 }
1624 }
1625
1626 pub fn touch_cached_at(
1632 &self,
1633 table_name: &str,
1634 pk: &str,
1635 sk: &str,
1636 timestamp: f64,
1637 ) -> Result<()> {
1638 let sql = format!(
1639 "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1640 escape_table_name(table_name)
1641 );
1642 self.conn.execute(&sql, params![timestamp, pk, sk])?;
1643 Ok(())
1644 }
1645
1646 pub fn get_lru_items(
1651 &self,
1652 table_name: &str,
1653 limit: usize,
1654 ) -> Result<Vec<(String, String, i64)>> {
1655 let sql = format!(
1656 "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1657 escape_table_name(table_name)
1658 );
1659 let mut stmt = self.conn.prepare(&sql)?;
1660 let rows = stmt
1661 .query_map(params![limit as i64], |row| {
1662 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1663 })?
1664 .collect::<std::result::Result<Vec<_>, _>>()?;
1665 Ok(rows)
1666 }
1667}
1668
1669#[derive(Debug, Clone)]
1671pub struct StreamRecord {
1672 pub event_name: String,
1673 pub keys_json: String,
1674 pub new_image: Option<String>,
1675 pub old_image: Option<String>,
1676 pub sequence_number: String,
1677 pub created_at: i64,
1678 pub user_identity: Option<String>,
1679}
1680
1681#[derive(Debug, Clone)]
1683pub struct TableStats {
1684 pub table_name: String,
1685 pub item_count: i64,
1686 pub size_bytes: u64,
1687}
1688
1689#[derive(Debug, Clone)]
1691pub struct DatabaseInfo {
1692 pub path: Option<String>,
1693 pub size_bytes: u64,
1694 pub table_count: usize,
1695 pub tables: Vec<TableInfoEntry>,
1696}
1697
1698#[derive(Debug, Clone)]
1700pub struct TableInfoEntry {
1701 pub stats: TableStats,
1702 pub metadata: Option<TableMetadata>,
1703}
1704
1705#[derive(Debug, Clone)]
1711pub struct TableMetadata {
1712 pub table_name: String,
1713 pub key_schema: String,
1714 pub attribute_definitions: String,
1715 pub gsi_definitions: Option<String>,
1716 pub lsi_definitions: Option<String>,
1717 pub stream_enabled: bool,
1718 pub stream_view_type: Option<String>,
1719 pub stream_label: Option<String>,
1720 pub ttl_attribute: Option<String>,
1721 pub ttl_enabled: bool,
1722 pub created_at: i64,
1723 pub table_status: String,
1724 pub billing_mode: Option<String>,
1725 pub provisioned_throughput: Option<String>,
1726 pub sse_specification: Option<String>,
1727 pub table_class: Option<String>,
1728 pub deletion_protection_enabled: bool,
1729 pub on_demand_throughput: Option<String>,
1730 pub table_id: Option<String>,
1733}
1734
1735#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
1737fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
1738 Ok(TableMetadata {
1739 table_name: row.get(0)?,
1740 key_schema: row.get(1)?,
1741 attribute_definitions: row.get(2)?,
1742 gsi_definitions: row.get(3)?,
1743 lsi_definitions: row.get(4)?,
1744 stream_enabled: row.get::<_, i32>(5)? != 0,
1745 stream_view_type: row.get(6)?,
1746 stream_label: row.get(7)?,
1747 ttl_attribute: row.get(8)?,
1748 ttl_enabled: row.get::<_, i32>(9)? != 0,
1749 created_at: row.get(10)?,
1750 table_status: row.get(11)?,
1751 billing_mode: row.get(12)?,
1752 provisioned_throughput: row.get(13)?,
1753 sse_specification: row.get(14)?,
1754 table_class: row.get(15)?,
1755 deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
1756 on_demand_throughput: row.get(17)?,
1757 table_id: row.get(18)?,
1758 })
1759}
1760
1761#[cfg(all(test, any(feature = "native-sqlite", feature = "_has-encryption")))]
1762mod tests {
1763 use super::*;
1764
1765 fn test_storage() -> Storage {
1766 Storage::memory().expect("Failed to create in-memory storage")
1767 }
1768
1769 #[test]
1770 fn test_initialize_creates_metadata_tables() {
1771 let storage = test_storage();
1772 let version: String = storage
1774 .conn()
1775 .query_row(
1776 "SELECT value FROM _config WHERE key = 'schema_version'",
1777 [],
1778 |row| row.get(0),
1779 )
1780 .unwrap();
1781 assert_eq!(version, SCHEMA_VERSION);
1782 }
1783
1784 #[test]
1785 fn test_migrate_v6_to_v7_adds_on_demand_throughput_column() {
1786 let tmp = tempfile::NamedTempFile::new().unwrap();
1790 let path = tmp.path().to_str().unwrap().to_string();
1791
1792 {
1795 let conn = Connection::open(&path).unwrap();
1796 conn.execute_batch(
1797 "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1798 CREATE TABLE _tables (
1799 table_name TEXT PRIMARY KEY,
1800 key_schema TEXT NOT NULL,
1801 attribute_definitions TEXT NOT NULL,
1802 gsi_definitions TEXT,
1803 lsi_definitions TEXT,
1804 stream_enabled INTEGER DEFAULT 0,
1805 stream_view_type TEXT,
1806 stream_label TEXT,
1807 ttl_attribute TEXT,
1808 ttl_enabled INTEGER DEFAULT 0,
1809 created_at INTEGER NOT NULL,
1810 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1811 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1812 provisioned_throughput TEXT,
1813 tags TEXT,
1814 sse_specification TEXT,
1815 table_class TEXT,
1816 deletion_protection_enabled INTEGER DEFAULT 0
1817 );",
1818 )
1819 .unwrap();
1820 conn.execute(
1821 "INSERT INTO _config (key, value) VALUES ('schema_version', '6')",
1822 [],
1823 )
1824 .unwrap();
1825 conn.execute(
1826 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1827 VALUES ('LegacyTable', ?1, ?2, 0)",
1828 params![
1829 r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1830 r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1831 ],
1832 )
1833 .unwrap();
1834 }
1835
1836 let storage = Storage::new(&path).unwrap();
1839 let version: String = storage
1840 .conn()
1841 .query_row(
1842 "SELECT value FROM _config WHERE key = 'schema_version'",
1843 [],
1844 |r| r.get(0),
1845 )
1846 .unwrap();
1847 assert_eq!(version, SCHEMA_VERSION);
1848
1849 let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1852 assert_eq!(meta.table_name, "LegacyTable");
1853 assert!(meta.on_demand_throughput.is_none());
1854
1855 let col: Option<String> = storage
1857 .conn()
1858 .query_row(
1859 "SELECT on_demand_throughput FROM _tables WHERE table_name = 'LegacyTable'",
1860 [],
1861 |r| r.get(0),
1862 )
1863 .unwrap();
1864 assert!(col.is_none());
1865 }
1866
1867 #[test]
1871 fn test_migrate_v7_to_v8_backfills_table_id() {
1872 let tmp = tempfile::NamedTempFile::new().unwrap();
1873 let path = tmp.path().to_str().unwrap().to_string();
1874
1875 {
1878 let conn = Connection::open(&path).unwrap();
1879 conn.execute_batch(
1880 "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1881 CREATE TABLE _tables (
1882 table_name TEXT PRIMARY KEY,
1883 key_schema TEXT NOT NULL,
1884 attribute_definitions TEXT NOT NULL,
1885 gsi_definitions TEXT,
1886 lsi_definitions TEXT,
1887 stream_enabled INTEGER DEFAULT 0,
1888 stream_view_type TEXT,
1889 stream_label TEXT,
1890 ttl_attribute TEXT,
1891 ttl_enabled INTEGER DEFAULT 0,
1892 created_at INTEGER NOT NULL,
1893 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1894 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1895 provisioned_throughput TEXT,
1896 tags TEXT,
1897 sse_specification TEXT,
1898 table_class TEXT,
1899 deletion_protection_enabled INTEGER DEFAULT 0,
1900 on_demand_throughput TEXT
1901 );",
1902 )
1903 .unwrap();
1904 conn.execute(
1905 "INSERT INTO _config (key, value) VALUES ('schema_version', '7')",
1906 [],
1907 )
1908 .unwrap();
1909 conn.execute(
1910 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1911 VALUES ('LegacyTable', ?1, ?2, 0)",
1912 params![
1913 r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1914 r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1915 ],
1916 )
1917 .unwrap();
1918 }
1919
1920 let storage = Storage::new(&path).unwrap();
1922 let version: String = storage
1923 .conn()
1924 .query_row(
1925 "SELECT value FROM _config WHERE key = 'schema_version'",
1926 [],
1927 |r| r.get(0),
1928 )
1929 .unwrap();
1930 assert_eq!(version, SCHEMA_VERSION);
1931
1932 let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1935 let id = meta.table_id.expect("legacy table should be backfilled");
1936 assert!(!id.is_empty());
1937
1938 drop(storage);
1940 let storage2 = Storage::new(&path).unwrap();
1941 let meta2 = storage2.get_table_metadata("LegacyTable").unwrap().unwrap();
1942 assert_eq!(meta2.table_id.as_deref(), Some(id.as_str()));
1943 }
1944
1945 #[test]
1946 fn test_wal_mode_enabled() {
1947 let storage = test_storage();
1948 let mode: String = storage
1949 .conn()
1950 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
1951 .unwrap();
1952 assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
1954 }
1955
1956 #[test]
1962 fn fnv1a_hash_matches_known_vectors() {
1963 let storage = test_storage();
1964 let cases: [(&str, i64); 6] = [
1965 ("", 2166136261),
1966 ("a", 3826002220),
1967 ("u#1", 2199603432),
1968 ("artist#42", 2385694177),
1969 ("café", 2821410889),
1970 ("tenant#9007199254740993", 2022216178),
1971 ];
1972 for (input, expected) in cases {
1973 let got: i64 = storage
1974 .conn()
1975 .query_row("SELECT fnv1a_hash(?1)", [input], |row| row.get(0))
1976 .unwrap();
1977 assert_eq!(got, expected, "fnv1a_hash({input:?})");
1978 }
1979 }
1980
1981 #[test]
1982 fn test_table_metadata_crud() {
1983 let storage = test_storage();
1984
1985 assert!(!storage.table_exists("TestTable").unwrap());
1987 assert!(storage.list_table_names().unwrap().is_empty());
1988
1989 storage
1991 .insert_table_metadata(&CreateTableMetadata {
1992 table_name: "TestTable",
1993 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1994 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1995 created_at: 1000000,
1996 ..Default::default()
1997 })
1998 .unwrap();
1999
2000 assert!(storage.table_exists("TestTable").unwrap());
2001 assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2002
2003 let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2005 assert_eq!(meta.table_name, "TestTable");
2006 assert_eq!(meta.table_status, "ACTIVE");
2007 assert_eq!(meta.created_at, 1000000);
2008
2009 assert!(storage.delete_table_metadata("TestTable").unwrap());
2011 assert!(!storage.table_exists("TestTable").unwrap());
2012 }
2013
2014 #[test]
2015 fn test_create_and_drop_data_table() {
2016 let storage = test_storage();
2017 storage.create_data_table("MyTable").unwrap();
2018
2019 storage
2021 .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2022 .unwrap();
2023
2024 let item = storage.get_item("MyTable", "pk1", "").unwrap();
2025 assert!(item.is_some());
2026
2027 storage.drop_data_table("MyTable").unwrap();
2028 }
2029
2030 #[test]
2031 fn test_item_crud() {
2032 let storage = test_storage();
2033 storage.create_data_table("Items").unwrap();
2034
2035 let old = storage
2037 .put_item(
2038 "Items",
2039 "user#1",
2040 "profile",
2041 r#"{"name":{"S":"Alice"}}"#,
2042 20,
2043 )
2044 .unwrap();
2045 assert!(old.is_none()); let item = storage.get_item("Items", "user#1", "profile").unwrap();
2049 assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2050
2051 let old = storage
2053 .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2054 .unwrap();
2055 assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2056
2057 let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2059 assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2060
2061 assert!(
2063 storage
2064 .get_item("Items", "user#1", "profile")
2065 .unwrap()
2066 .is_none()
2067 );
2068 }
2069
2070 #[test]
2071 fn test_query_items() {
2072 let storage = test_storage();
2073 storage.create_data_table("Orders").unwrap();
2074
2075 for i in 1..=5 {
2077 let sk = format!("order#{i:03}");
2078 let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2079 storage
2080 .put_item("Orders", "user#1", &sk, &json, 10)
2081 .unwrap();
2082 }
2083
2084 let results = storage
2086 .query_items(
2087 "Orders",
2088 "user#1",
2089 &QueryParams {
2090 forward: true,
2091 ..Default::default()
2092 },
2093 )
2094 .unwrap();
2095 assert_eq!(results.len(), 5);
2096 assert_eq!(results[0].1, "order#001"); let results = storage
2100 .query_items(
2101 "Orders",
2102 "user#1",
2103 &QueryParams {
2104 forward: true,
2105 limit: Some(2),
2106 ..Default::default()
2107 },
2108 )
2109 .unwrap();
2110 assert_eq!(results.len(), 2);
2111
2112 let results = storage
2114 .query_items(
2115 "Orders",
2116 "user#1",
2117 &QueryParams {
2118 forward: false,
2119 limit: Some(2),
2120 ..Default::default()
2121 },
2122 )
2123 .unwrap();
2124 assert_eq!(results.len(), 2);
2125 assert_eq!(results[0].1, "order#005"); }
2127
2128 #[test]
2129 fn test_scan_items() {
2130 let storage = test_storage();
2131 storage.create_data_table("ScanTest").unwrap();
2132
2133 storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2134 storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2135 storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2136
2137 let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2138 assert_eq!(results.len(), 3);
2139
2140 let results = storage
2142 .scan_items(
2143 "ScanTest",
2144 &ScanParams {
2145 limit: Some(2),
2146 ..Default::default()
2147 },
2148 )
2149 .unwrap();
2150 assert_eq!(results.len(), 2);
2151
2152 let results = storage
2154 .scan_items(
2155 "ScanTest",
2156 &ScanParams {
2157 limit: Some(2),
2158 exclusive_start_pk: Some("a"),
2159 exclusive_start_sk: Some("1"),
2160 ..Default::default()
2161 },
2162 )
2163 .unwrap();
2164 assert_eq!(results.len(), 2);
2165 assert_eq!(results[0].0, "b"); }
2167
2168 #[test]
2169 fn test_count_items() {
2170 let storage = test_storage();
2171 storage.create_data_table("CountTest").unwrap();
2172
2173 assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2174
2175 storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2176 storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2177
2178 assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2179 }
2180
2181 #[test]
2182 fn test_gsi_table_lifecycle() {
2183 let storage = test_storage();
2184 storage.create_gsi_table("Orders", "ByDate").unwrap();
2185
2186 let gsi_name = "Orders::gsi::ByDate";
2188 let sql = format!(
2189 "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2190 gsi_name.replace('"', "\"\"")
2191 );
2192 storage
2193 .conn()
2194 .execute(
2195 &sql,
2196 params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2197 )
2198 .unwrap();
2199
2200 storage.drop_gsi_table("Orders", "ByDate").unwrap();
2201 }
2202
2203 #[test]
2204 fn test_nonexistent_table_metadata() {
2205 let storage = test_storage();
2206 assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2207 assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2208 }
2209
2210 #[test]
2211 fn test_metadata_cache_hit() {
2212 let storage = test_storage();
2213 storage
2214 .insert_table_metadata(&CreateTableMetadata {
2215 table_name: "CacheTest",
2216 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2217 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2218 created_at: 1000000,
2219 ..Default::default()
2220 })
2221 .unwrap();
2222
2223 let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2225 assert_eq!(meta1.table_name, "CacheTest");
2226
2227 let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2229 assert_eq!(meta2.table_name, "CacheTest");
2230 assert_eq!(meta1.created_at, meta2.created_at);
2231
2232 assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2234 }
2235
2236 #[test]
2237 fn test_metadata_cache_invalidated_on_delete() {
2238 let storage = test_storage();
2239 storage
2240 .insert_table_metadata(&CreateTableMetadata {
2241 table_name: "DelCache",
2242 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2243 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2244 created_at: 1000000,
2245 ..Default::default()
2246 })
2247 .unwrap();
2248
2249 storage.get_table_metadata("DelCache").unwrap();
2251 assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2252
2253 storage.delete_table_metadata("DelCache").unwrap();
2255 assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2256 }
2257
2258 #[test]
2259 fn test_metadata_cache_invalidated_on_stream_enable() {
2260 let storage = test_storage();
2261 storage
2262 .insert_table_metadata(&CreateTableMetadata {
2263 table_name: "StreamCache",
2264 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2265 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2266 created_at: 1000000,
2267 ..Default::default()
2268 })
2269 .unwrap();
2270
2271 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2273 assert!(!meta.stream_enabled);
2274
2275 storage
2277 .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2278 .unwrap();
2279 assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2280
2281 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2283 assert!(meta.stream_enabled);
2284 }
2285
2286 #[test]
2287 fn test_metadata_cache_invalidated_on_ttl_update() {
2288 let storage = test_storage();
2289 storage
2290 .insert_table_metadata(&CreateTableMetadata {
2291 table_name: "TtlCache",
2292 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2293 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2294 created_at: 1000000,
2295 ..Default::default()
2296 })
2297 .unwrap();
2298
2299 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2301 assert!(!meta.ttl_enabled);
2302
2303 storage
2305 .update_ttl_config("TtlCache", Some("expires_at"), true)
2306 .unwrap();
2307 assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2308
2309 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2311 assert!(meta.ttl_enabled);
2312 assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2313 }
2314
2315 #[test]
2316 fn test_num_to_buffer_zero() {
2317 assert_eq!(num_to_buffer("0"), vec![0x80]);
2319 assert_eq!(num_to_buffer("-0"), vec![0x80]);
2320 }
2321
2322 #[test]
2323 fn test_hash_prefix_string_keys() {
2324 let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2327 let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2328 let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2329 let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2330
2331 assert_eq!(
2334 hash_bucket(&h1),
2335 0,
2336 "3635 should be bucket 0, got hash {h1}"
2337 );
2338 assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2339
2340 assert_eq!(
2342 hash_bucket(&h3),
2343 1,
2344 "1668 should be bucket 1, got hash {h3}"
2345 );
2346
2347 assert_eq!(
2349 hash_bucket(&h4),
2350 4,
2351 "3435 should be bucket 4, got hash {h4}"
2352 );
2353 }
2354
2355 #[test]
2356 fn test_hash_prefix_number_keys() {
2357 let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2360 assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2361
2362 let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2364 assert_eq!(
2365 hash_bucket(&h2),
2366 4095,
2367 "2388 should be bucket 4095, got hash {h2}"
2368 );
2369 }
2370
2371 #[test]
2372 fn test_hash_in_segment() {
2373 assert!(hash_in_segment("000000", 0, 4096));
2375 assert!(!hash_in_segment("000000", 1, 4096));
2376
2377 assert!(hash_in_segment("001000", 1, 4096));
2379 assert!(!hash_in_segment("001000", 0, 4096));
2380
2381 assert!(hash_in_segment("fff000", 4095, 4096));
2383 assert!(!hash_in_segment("fff000", 0, 4096));
2384
2385 assert!(hash_in_segment("000000", 0, 2));
2387 assert!(hash_in_segment("7ff000", 0, 2));
2388 assert!(hash_in_segment("800000", 1, 2));
2389 assert!(hash_in_segment("fff000", 1, 2));
2390 }
2391}