1#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
2use crate::errors::{DynoxideError, Result};
3#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
4use crate::storage_backend::clock::{Clock, SystemClock};
5#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
6use crate::storage_backend::sql_builders::{self, escape_table_name};
7use crate::types::AttributeValue;
8#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
9use rusqlite::{Connection, params};
10#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
11use std::{cell::RefCell, collections::HashMap, sync::Arc};
12
13#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
15const SCHEMA_VERSION: &str = "8";
16
17pub(crate) const HASH_BUCKETS: u32 = 4096;
20
21pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
35 let key_bytes = match pk_value {
36 AttributeValue::S(s) => s.as_bytes().to_vec(),
37 AttributeValue::N(n) => num_to_buffer(n),
38 AttributeValue::B(b) => b.clone(),
39 _ => vec![], };
41
42 let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
43 format!("{:032x}", digest)[..6].to_string()
44}
45
46pub fn hash_bucket(hash_prefix: &str) -> u32 {
48 let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
49 u32::from_str_radix(prefix_3, 16).unwrap_or(0)
50}
51
52fn num_to_buffer(num_str: &str) -> Vec<u8> {
58 let trimmed = num_str.trim();
59 if trimmed.is_empty() {
60 return vec![0x80];
61 }
62
63 use bigdecimal::BigDecimal;
64 use std::str::FromStr;
65
66 let bd = match BigDecimal::from_str(trimmed) {
67 Ok(v) => v,
68 Err(_) => return vec![0x80],
69 };
70
71 if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
72 return vec![0x80];
73 }
74
75 let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
76 let bd_abs = if is_negative { -&bd } else { bd.clone() };
77
78 let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
79 if mantissa.is_empty() {
80 return vec![0x80];
81 }
82
83 let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
86 let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
87
88 let mut byte_array: Vec<u8>;
89 if byte_len_no_exp < 20 && is_negative {
90 byte_array = vec![0u8; byte_len_no_exp + 2];
91 byte_array[byte_len_no_exp + 1] = 102;
92 } else {
93 byte_array = vec![0u8; byte_len_no_exp + 1];
94 }
95
96 let exp_sum = exponent + append_zero;
100 let exp_byte_val = floor_div(exp_sum, 2) - 64;
101 if is_negative {
102 byte_array[0] = (exp_byte_val ^ !0i64) as u8;
105 } else {
106 byte_array[0] = exp_byte_val as u8;
107 }
108
109 let mut mi: i64 = 0; let mlen = mantissa.len() as i64;
113 let mut appended_zero = false;
114
115 while mi < mlen {
116 let bai = ((mi + append_zero) / 2 + 1) as usize; if append_zero != 0 && mi == 0 && !appended_zero {
118 byte_array[bai] = 0;
119 appended_zero = true;
120 mi -= 1; } else if (mi + append_zero) % 2 == 0 {
122 byte_array[bai] = mantissa[mi as usize] * 10;
123 } else {
124 byte_array[bai] += mantissa[mi as usize];
125 }
126
127 if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
129 if is_negative {
130 byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
131 } else {
132 byte_array[bai] = byte_array[bai].wrapping_add(1);
133 }
134 }
135
136 mi += 1; }
138
139 byte_array
140}
141
142fn floor_div(a: i64, b: i64) -> i64 {
144 let d = a / b;
145 let r = a % b;
146 if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
147}
148
149fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
156 let normalized = bd.normalized();
158
159 let (bigint, scale) = normalized.as_bigint_and_exponent();
162 let digits_str = bigint.to_string();
163 let digits_str = digits_str.trim_start_matches('-');
164
165 let digits: Vec<u8> = digits_str
166 .chars()
167 .map(|c| c.to_digit(10).unwrap() as u8)
168 .collect();
169
170 let exponent = digits.len() as i64 - scale;
173
174 (digits, exponent)
175}
176
177pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
184 let bucket = hash_bucket(hash_prefix);
185 let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
186 let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
187 bucket >= start && bucket <= end
188}
189
190pub(crate) fn ceiling_div(a: u32, b: u32) -> u32 {
191 a.div_ceil(b)
192}
193
194#[derive(Debug, Default)]
196pub struct ScanParams<'a> {
197 pub limit: Option<usize>,
198 pub exclusive_start_pk: Option<&'a str>,
199 pub exclusive_start_sk: Option<&'a str>,
200 pub segment: Option<u32>,
201 pub total_segments: Option<u32>,
202 pub exclusive_start_base_pk: Option<&'a str>,
204 pub exclusive_start_base_sk: Option<&'a str>,
206}
207
208#[derive(Debug, Default)]
210pub struct CreateTableMetadata<'a> {
211 pub table_name: &'a str,
212 pub key_schema: &'a str,
213 pub attribute_definitions: &'a str,
214 pub gsi_definitions: Option<&'a str>,
215 pub lsi_definitions: Option<&'a str>,
216 pub provisioned_throughput: Option<&'a str>,
217 pub created_at: i64,
218 pub sse_specification: Option<&'a str>,
219 pub table_class: Option<&'a str>,
220 pub deletion_protection_enabled: bool,
221 pub billing_mode: Option<&'a str>,
222 pub on_demand_throughput: Option<&'a str>,
223}
224
225#[derive(Debug, Default)]
227pub struct QueryParams<'a> {
228 pub sk_condition: Option<&'a str>,
229 pub sk_params: &'a [&'a str],
230 pub forward: bool,
231 pub limit: Option<usize>,
232 pub exclusive_start_sk: Option<&'a str>,
233 pub exclusive_start_base_pk: Option<&'a str>,
235 pub exclusive_start_base_sk: Option<&'a str>,
237}
238
239#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
247pub struct Storage {
248 conn: Connection,
249 metadata_cache: RefCell<HashMap<String, TableMetadata>>,
252 clock: Arc<dyn Clock>,
254}
255
256#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
257impl Storage {
258 pub fn new(path: &str) -> Result<Self> {
260 let conn = Connection::open(path)?;
261 let mut storage = Self {
262 conn,
263 metadata_cache: RefCell::new(HashMap::new()),
264 clock: Arc::new(SystemClock),
265 };
266 storage.initialize().map_err(Self::maybe_encrypted_error)?;
267 Ok(storage)
268 }
269
270 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
276 self.clock = clock;
277 self
278 }
279
280 pub(crate) fn clock(&self) -> &dyn Clock {
282 self.clock.as_ref()
283 }
284
285 fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
288 if let DynoxideError::SqliteError(ref sqlite_err) = err {
289 if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
290 return DynoxideError::InternalServerError(
291 "Database file is encrypted or not a valid SQLite database. \
292 If encrypted, enable the `encryption` or `encryption-cc` feature \
293 and use Database::new_encrypted() with the correct key."
294 .to_string(),
295 );
296 }
297 }
298 err
299 }
300
301 #[cfg(feature = "_has-encryption")]
307 pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
308 use zeroize::Zeroize;
309
310 let conn = Connection::open(path)?;
311 let mut pragma_val = format!("x'{key}'");
316 conn.pragma_update(None, "key", &pragma_val)?;
317 pragma_val.zeroize();
318 conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
320 let mut storage = Self {
321 conn,
322 metadata_cache: RefCell::new(HashMap::new()),
323 clock: Arc::new(SystemClock),
324 };
325 storage.initialize()?;
326 Ok(storage)
327 }
328
329 pub fn memory() -> Result<Self> {
331 let conn = Connection::open_in_memory()?;
332 let mut storage = Self {
333 conn,
334 metadata_cache: RefCell::new(HashMap::new()),
335 clock: Arc::new(SystemClock),
336 };
337 storage.initialize()?;
338 Ok(storage)
339 }
340
341 fn initialize(&mut self) -> Result<()> {
343 self.conn.pragma_update(None, "journal_mode", "WAL")?;
345
346 self.conn.create_scalar_function(
349 "fnv1a_hash",
350 1,
351 rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
352 | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
353 |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
354 let pk_ref = ctx.get_raw(0);
355 let pk_bytes = match pk_ref {
356 rusqlite::types::ValueRef::Text(bytes) => bytes,
357 _ => {
358 return Err(rusqlite::Error::InvalidFunctionParameterType(
359 0,
360 rusqlite::types::Type::Text,
361 ));
362 }
363 };
364 let mut hash: u32 = 2166136261;
365 for &byte in pk_bytes {
366 hash ^= byte as u32;
367 hash = hash.wrapping_mul(16777619);
368 }
369 Ok(hash as i64)
370 },
371 )?;
372
373 self.conn.execute_batch(sql_builders::INIT_SCHEMA)?;
375
376 let _ = self
378 .conn
379 .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
380
381 self.conn.execute(
383 "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
384 params![SCHEMA_VERSION],
385 )?;
386
387 let version: i32 = self
389 .conn
390 .query_row(
391 "SELECT value FROM _config WHERE key = 'schema_version'",
392 [],
393 |r| r.get::<_, String>(0),
394 )
395 .unwrap_or_else(|_| "1".to_string())
396 .parse()
397 .unwrap_or(1);
398
399 if version < 2 {
400 self.migrate_v1_to_v2()?;
401 }
402 if version < 3 {
403 self.migrate_v2_to_v3()?;
404 }
405 if version < 4 {
406 self.migrate_v3_to_v4()?;
407 }
408 if version < 5 {
409 self.migrate_v4_to_v5()?;
410 }
411 if version < 6 {
412 self.migrate_v5_to_v6()?;
413 }
414 if version < 7 {
415 self.migrate_v6_to_v7()?;
416 }
417 if version < 8 {
418 self.migrate_v7_to_v8()?;
419 }
420
421 Ok(())
422 }
423
424 fn migrate_v1_to_v2(&self) -> Result<()> {
426 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427 let table_names: Vec<String> = stmt
428 .query_map([], |row| row.get(0))?
429 .collect::<std::result::Result<Vec<_>, _>>()?;
430
431 for table_name in &table_names {
432 let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433 let _ = self.conn.execute(
434 &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435 [],
436 );
437 }
438
439 self.conn.execute(
440 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441 [],
442 )?;
443
444 Ok(())
445 }
446
447 fn migrate_v2_to_v3(&self) -> Result<()> {
449 let _ = self
450 .conn
451 .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453 self.conn.execute(
454 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455 [],
456 )?;
457
458 Ok(())
459 }
460
461 fn migrate_v3_to_v4(&self) -> Result<()> {
463 let _ = self
464 .conn
465 .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466 let _ = self
467 .conn
468 .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469 let _ = self.conn.execute(
470 "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471 [],
472 );
473
474 self.conn.execute(
475 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476 [],
477 )?;
478
479 Ok(())
480 }
481
482 fn migrate_v4_to_v5(&self) -> Result<()> {
484 let mut stmt = self
485 .conn
486 .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487 let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489 .collect::<std::result::Result<Vec<_>, _>>()?;
490
491 for (table_name, gsi_json, lsi_json) in &tables {
492 if let Some(json) = gsi_json {
493 if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494 for gsi in &gsis {
495 if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496 let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497 let idx_name =
498 escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499 let _ = self.conn.execute_batch(&format!(
500 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501 ));
502 }
503 }
504 }
505 }
506 if let Some(json) = lsi_json {
507 if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508 for lsi in &lsis {
509 if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510 let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511 let idx_name =
512 escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513 let _ = self.conn.execute_batch(&format!(
514 "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515 ));
516 }
517 }
518 }
519 }
520 }
521
522 self.conn.execute(
523 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524 [],
525 )?;
526
527 Ok(())
528 }
529
530 fn migrate_v5_to_v6(&self) -> Result<()> {
532 let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533 let table_names: Vec<String> = stmt
534 .query_map([], |row| row.get(0))?
535 .collect::<std::result::Result<Vec<_>, _>>()?;
536
537 for table_name in &table_names {
538 let escaped = escape_table_name(table_name);
539 let _ = self.conn.execute(
540 &format!(
541 "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542 ),
543 [],
544 );
545 }
546
547 self.conn.execute(
548 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549 [],
550 )?;
551
552 Ok(())
553 }
554
555 fn migrate_v6_to_v7(&self) -> Result<()> {
557 let _ = self.conn.execute(
558 "ALTER TABLE _tables ADD COLUMN on_demand_throughput TEXT",
559 [],
560 );
561
562 self.conn.execute(
563 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '7')",
564 [],
565 )?;
566
567 Ok(())
568 }
569
570 fn migrate_v7_to_v8(&self) -> Result<()> {
581 let _ = self
582 .conn
583 .execute("ALTER TABLE _tables ADD COLUMN table_id TEXT", []);
584
585 let names: Vec<String> = {
586 let mut stmt = self
587 .conn
588 .prepare("SELECT table_name FROM _tables WHERE table_id IS NULL")?;
589 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
590 rows.collect::<rusqlite::Result<Vec<String>>>()?
591 };
592 for name in names {
593 let id = uuid::Uuid::new_v4().to_string();
594 self.conn.execute(
595 "UPDATE _tables SET table_id = ?1 WHERE table_name = ?2",
596 params![id, name],
597 )?;
598 }
599
600 self.conn.execute(
601 "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '8')",
602 [],
603 )?;
604
605 Ok(())
606 }
607
608 pub fn conn(&self) -> &Connection {
610 &self.conn
611 }
612
613 pub fn conn_mut(&mut self) -> &mut Connection {
615 &mut self.conn
616 }
617
618 pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
624 let table_name = m.table_name;
625 let (sql, params) = sql_builders::insert_table_metadata(m);
626 self.conn
627 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
628 self.metadata_cache.borrow_mut().remove(table_name);
629 Ok(())
630 }
631
632 pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
638 if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
640 return Ok(Some(cached.clone()));
641 }
642
643 let (sql, params) = sql_builders::get_table_metadata(table_name);
644 let mut stmt = self.conn.prepare(&sql)?;
645
646 let result = stmt.query_row(rusqlite::params_from_iter(params.iter()), row_to_metadata);
647
648 match result {
649 Ok(meta) => {
650 self.metadata_cache
651 .borrow_mut()
652 .insert(table_name.to_string(), meta.clone());
653 Ok(Some(meta))
654 }
655 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
656 Err(e) => Err(DynoxideError::from(e)),
657 }
658 }
659
660 pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
662 let (sql, params) = sql_builders::delete_table_metadata(table_name);
663 let affected = self
664 .conn
665 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
666 self.metadata_cache.borrow_mut().remove(table_name);
667 Ok(affected > 0)
668 }
669
670 pub fn update_table_metadata(
672 &self,
673 table_name: &str,
674 attribute_definitions: &str,
675 gsi_definitions: Option<&str>,
676 ) -> Result<()> {
677 self.conn.execute(
678 "UPDATE _tables SET attribute_definitions = ?1, gsi_definitions = ?2 WHERE table_name = ?3",
679 params![attribute_definitions, gsi_definitions, table_name],
680 )?;
681 self.metadata_cache.borrow_mut().remove(table_name);
682 Ok(())
683 }
684
685 pub fn update_provisioned_throughput(
687 &self,
688 table_name: &str,
689 provisioned_throughput: &str,
690 ) -> Result<()> {
691 self.conn.execute(
692 "UPDATE _tables SET provisioned_throughput = ?1 WHERE table_name = ?2",
693 params![provisioned_throughput, table_name],
694 )?;
695 self.metadata_cache.borrow_mut().remove(table_name);
696 Ok(())
697 }
698
699 pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
701 self.conn.execute(
702 "UPDATE _tables SET provisioned_throughput = NULL WHERE table_name = ?1",
703 params![table_name],
704 )?;
705 self.metadata_cache.borrow_mut().remove(table_name);
706 Ok(())
707 }
708
709 pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
711 self.conn.execute(
712 "UPDATE _tables SET billing_mode = ?1 WHERE table_name = ?2",
713 params![billing_mode, table_name],
714 )?;
715 self.metadata_cache.borrow_mut().remove(table_name);
716 Ok(())
717 }
718
719 pub fn update_table_class(&self, table_name: &str, table_class: &str) -> Result<()> {
721 self.conn.execute(
722 "UPDATE _tables SET table_class = ?1 WHERE table_name = ?2",
723 params![table_class, table_name],
724 )?;
725 self.metadata_cache.borrow_mut().remove(table_name);
726 Ok(())
727 }
728
729 pub fn update_on_demand_throughput(
731 &self,
732 table_name: &str,
733 on_demand_throughput: &str,
734 ) -> Result<()> {
735 self.conn.execute(
736 "UPDATE _tables SET on_demand_throughput = ?1 WHERE table_name = ?2",
737 params![on_demand_throughput, table_name],
738 )?;
739 self.metadata_cache.borrow_mut().remove(table_name);
740 Ok(())
741 }
742
743 pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
749 let tags_json: Option<String> = self.conn.query_row(
750 "SELECT tags FROM _tables WHERE table_name = ?1",
751 params![table_name],
752 |row| row.get(0),
753 )?;
754
755 match tags_json {
756 Some(json) => serde_json::from_str(&json)
757 .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
758 None => Ok(Vec::new()),
759 }
760 }
761
762 pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
764 use std::collections::BTreeMap;
765
766 let existing = self.get_tags(table_name)?;
767 let mut tag_map: BTreeMap<String, String> =
768 existing.into_iter().map(|t| (t.key, t.value)).collect();
769
770 for tag in new_tags {
771 tag_map.insert(tag.key.clone(), tag.value.clone());
772 }
773
774 if tag_map.len() > 50 {
775 return Err(DynoxideError::ValidationException(
776 "One or more parameter values were invalid: \
777 Too many tags: tag limit is 50"
778 .to_string(),
779 ));
780 }
781
782 let merged: Vec<crate::types::Tag> = tag_map
783 .into_iter()
784 .map(|(k, v)| crate::types::Tag { key: k, value: v })
785 .collect();
786
787 let json = serde_json::to_string(&merged)
788 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
789
790 self.conn.execute(
791 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
792 params![json, table_name],
793 )?;
794 Ok(())
795 }
796
797 pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
799 self.conn.execute(
800 "UPDATE _tables SET deletion_protection_enabled = ?1 WHERE table_name = ?2",
801 params![enabled as i32, table_name],
802 )?;
803 self.metadata_cache.borrow_mut().remove(table_name);
804 Ok(())
805 }
806
807 pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
809 let mut tags = self.get_tags(table_name)?;
810 tags.retain(|t| !keys.contains(&t.key));
811
812 let json = if tags.is_empty() {
813 None
814 } else {
815 Some(
816 serde_json::to_string(&tags)
817 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
818 )
819 };
820
821 self.conn.execute(
822 "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
823 params![json, table_name],
824 )?;
825 Ok(())
826 }
827
828 pub fn list_table_names(&self) -> Result<Vec<String>> {
830 let (sql, params) = sql_builders::list_table_names();
831 let mut stmt = self.conn.prepare(&sql)?;
832 let names = stmt
833 .query_map(rusqlite::params_from_iter(params.iter()), |row| row.get(0))?
834 .collect::<std::result::Result<Vec<String>, _>>()?;
835 Ok(names)
836 }
837
838 pub fn table_exists(&self, table_name: &str) -> Result<bool> {
840 let (sql, params) = sql_builders::table_exists(table_name);
841 let count: i32 =
842 self.conn
843 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
844 row.get(0)
845 })?;
846 Ok(count > 0)
847 }
848
849 #[allow(dead_code)]
851 pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
852 self.metadata_cache.borrow_mut().remove(table_name);
853 }
854
855 pub fn create_data_table(&self, table_name: &str) -> Result<()> {
861 let (sql, params) = sql_builders::create_data_table(table_name);
862 self.conn
863 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
864 Ok(())
865 }
866
867 pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
869 let (sql, params) = sql_builders::drop_data_table(table_name);
870 self.conn
871 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
872 Ok(())
873 }
874
875 pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
877 let (sql, _) = sql_builders::create_gsi_table(table_name, index_name);
878 self.conn.execute_batch(&sql)?;
879 Ok(())
880 }
881
882 pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
884 let (sql, params) = sql_builders::drop_gsi_table(table_name, index_name);
885 self.conn
886 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
887 Ok(())
888 }
889
890 #[allow(clippy::too_many_arguments)]
896 pub fn insert_gsi_item(
897 &self,
898 table_name: &str,
899 index_name: &str,
900 gsi_pk: &str,
901 gsi_sk: &str,
902 table_pk: &str,
903 table_sk: &str,
904 item_json: &str,
905 ) -> Result<()> {
906 let sql = sql_builders::gsi_insert_sql(table_name, index_name);
907 let params = sql_builders::gsi_insert_params(gsi_pk, gsi_sk, table_pk, table_sk, item_json);
908 self.conn
909 .prepare_cached(&sql)?
910 .execute(rusqlite::params_from_iter(params.iter()))?;
911 Ok(())
912 }
913
914 pub fn insert_gsi_items(
918 &self,
919 table_name: &str,
920 index_name: &str,
921 rows: &[crate::storage_backend::GsiItemRow],
922 ) -> Result<()> {
923 let sql = sql_builders::gsi_insert_sql(table_name, index_name);
924 let mut stmt = self.conn.prepare_cached(&sql)?;
925 for row in rows {
926 let params = sql_builders::gsi_insert_params(
927 &row.gsi_pk,
928 &row.gsi_sk,
929 &row.table_pk,
930 &row.table_sk,
931 &row.item_json,
932 );
933 stmt.execute(rusqlite::params_from_iter(params.iter()))?;
934 }
935 Ok(())
936 }
937
938 pub fn delete_gsi_item(
940 &self,
941 table_name: &str,
942 index_name: &str,
943 table_pk: &str,
944 table_sk: &str,
945 ) -> Result<()> {
946 let (sql, params) =
947 sql_builders::delete_gsi_item(table_name, index_name, table_pk, table_sk);
948 self.conn
949 .prepare_cached(&sql)?
950 .execute(rusqlite::params_from_iter(params.iter()))?;
951 Ok(())
952 }
953
954 pub fn query_gsi_items(
956 &self,
957 table_name: &str,
958 index_name: &str,
959 gsi_pk: &str,
960 params: &QueryParams,
961 ) -> Result<Vec<(String, String, String)>> {
962 let (sql, params_vec) =
963 sql_builders::query_gsi_items(table_name, index_name, gsi_pk, params);
964 let mut stmt = self.conn.prepare(&sql)?;
965 let rows = stmt
966 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
967 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
968 })?
969 .collect::<std::result::Result<Vec<_>, _>>()?;
970
971 Ok(rows)
972 }
973
974 pub fn scan_gsi_items(
976 &self,
977 table_name: &str,
978 index_name: &str,
979 params: &ScanParams,
980 ) -> Result<Vec<(String, String, String)>> {
981 let (sql, params_vec) = sql_builders::scan_gsi_items(table_name, index_name, params);
982 let mut stmt = self.conn.prepare(&sql)?;
983 let rows = stmt
984 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
985 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
986 })?
987 .collect::<std::result::Result<Vec<_>, _>>()?;
988
989 Ok(rows)
990 }
991
992 pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
998 let (sql, _) = sql_builders::create_lsi_table(table_name, index_name);
999 self.conn.execute_batch(&sql)?;
1000 Ok(())
1001 }
1002
1003 pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1005 let (sql, params) = sql_builders::drop_lsi_table(table_name, index_name);
1006 self.conn
1007 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1008 Ok(())
1009 }
1010
1011 #[allow(clippy::too_many_arguments)]
1017 pub fn insert_lsi_item(
1018 &self,
1019 table_name: &str,
1020 index_name: &str,
1021 pk: &str,
1022 sk: &str,
1023 base_pk: &str,
1024 base_sk: &str,
1025 item_json: &str,
1026 ) -> Result<()> {
1027 let sql = sql_builders::lsi_insert_sql(table_name, index_name);
1028 let params = sql_builders::lsi_insert_params(pk, sk, base_pk, base_sk, item_json);
1029 self.conn
1030 .prepare_cached(&sql)?
1031 .execute(rusqlite::params_from_iter(params.iter()))?;
1032 Ok(())
1033 }
1034
1035 pub fn delete_lsi_item(
1037 &self,
1038 table_name: &str,
1039 index_name: &str,
1040 base_pk: &str,
1041 base_sk: &str,
1042 ) -> Result<()> {
1043 let (sql, params) = sql_builders::delete_lsi_item(table_name, index_name, base_pk, base_sk);
1044 self.conn
1045 .prepare_cached(&sql)?
1046 .execute(rusqlite::params_from_iter(params.iter()))?;
1047 Ok(())
1048 }
1049
1050 pub fn query_lsi_items(
1052 &self,
1053 table_name: &str,
1054 index_name: &str,
1055 pk: &str,
1056 params: &QueryParams,
1057 ) -> Result<Vec<(String, String, String)>> {
1058 let (sql, params_vec) = sql_builders::query_lsi_items(table_name, index_name, pk, params);
1059 let mut stmt = self.conn.prepare(&sql)?;
1060 let rows = stmt
1061 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1062 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1063 })?
1064 .collect::<std::result::Result<Vec<_>, _>>()?;
1065
1066 Ok(rows)
1067 }
1068
1069 pub fn scan_lsi_items(
1071 &self,
1072 table_name: &str,
1073 index_name: &str,
1074 params: &ScanParams,
1075 ) -> Result<Vec<(String, String, String)>> {
1076 let (sql, params_vec) = sql_builders::scan_lsi_items(table_name, index_name, params);
1077 let mut stmt = self.conn.prepare(&sql)?;
1078 let rows = stmt
1079 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1080 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1081 })?
1082 .collect::<std::result::Result<Vec<_>, _>>()?;
1083
1084 Ok(rows)
1085 }
1086
1087 pub fn begin_transaction(&self) -> Result<()> {
1093 self.conn.execute_batch(sql_builders::BEGIN)?;
1094 Ok(())
1095 }
1096
1097 pub fn commit(&self) -> Result<()> {
1099 self.conn.execute_batch(sql_builders::COMMIT)?;
1100 Ok(())
1101 }
1102
1103 pub fn rollback(&self) -> Result<()> {
1105 self.conn.execute_batch(sql_builders::ROLLBACK)?;
1106 Ok(())
1107 }
1108
1109 pub fn enable_bulk_loading(&self) -> Result<()> {
1115 self.conn.execute_batch(
1116 "PRAGMA synchronous = OFF;
1117 PRAGMA cache_size = -64000;
1118 PRAGMA temp_store = MEMORY;
1119 PRAGMA mmap_size = 268435456;",
1120 )?;
1121 Ok(())
1122 }
1123
1124 pub fn disable_bulk_loading(&self) -> Result<()> {
1126 self.conn.execute_batch(
1127 "PRAGMA synchronous = NORMAL;
1128 PRAGMA cache_size = -2000;
1129 PRAGMA temp_store = DEFAULT;
1130 PRAGMA mmap_size = 0;",
1131 )?;
1132 Ok(())
1133 }
1134
1135 pub fn put_item(
1141 &self,
1142 table_name: &str,
1143 pk: &str,
1144 sk: &str,
1145 item_json: &str,
1146 item_size: usize,
1147 ) -> Result<Option<String>> {
1148 self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1149 }
1150
1151 pub fn put_item_with_hash(
1153 &self,
1154 table_name: &str,
1155 pk: &str,
1156 sk: &str,
1157 item_json: &str,
1158 item_size: usize,
1159 hash_prefix: &str,
1160 ) -> Result<Option<String>> {
1161 let old_item = self.get_item(table_name, pk, sk)?;
1163
1164 let (sql, params) =
1165 sql_builders::put_item_with_hash(table_name, pk, sk, item_json, item_size, hash_prefix);
1166 self.conn
1167 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1168
1169 Ok(old_item)
1170 }
1171
1172 pub fn put_base_items(
1177 &self,
1178 table_name: &str,
1179 rows: &[crate::storage_backend::BaseItemRow],
1180 ) -> Result<()> {
1181 let escaped = escape_table_name(table_name);
1182 let sql = format!(
1183 "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1184 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"
1185 );
1186 let mut stmt = self.conn.prepare_cached(&sql)?;
1187 for row in rows {
1188 stmt.execute(params![
1189 row.pk,
1190 row.sk,
1191 row.item_json,
1192 row.item_size as i64,
1193 row.cached_at,
1194 row.hash_prefix
1195 ])?;
1196 }
1197 Ok(())
1198 }
1199
1200 pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1202 let (sql, params) = sql_builders::get_item(table_name, pk, sk);
1203 let result = self
1204 .conn
1205 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1206 row.get(0)
1207 });
1208
1209 match result {
1210 Ok(json) => Ok(Some(json)),
1211 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1212 Err(e) => Err(DynoxideError::from(e)),
1213 }
1214 }
1215
1216 pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1218 let (sql, params) = sql_builders::get_partition_size(table_name, pk);
1219 let size: i64 =
1220 self.conn
1221 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1222 row.get(0)
1223 })?;
1224 Ok(size)
1225 }
1226
1227 pub fn get_lsi_partition_size(
1230 &self,
1231 table_name: &str,
1232 index_name: &str,
1233 pk: &str,
1234 ) -> Result<i64> {
1235 let (sql, params) = sql_builders::get_lsi_partition_size(table_name, index_name, pk);
1236 let size: i64 =
1237 self.conn
1238 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1239 row.get(0)
1240 })?;
1241 Ok(size)
1242 }
1243
1244 pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1246 let old_item = self.get_item(table_name, pk, sk)?;
1247
1248 let (sql, params) = sql_builders::delete_item(table_name, pk, sk);
1249 self.conn
1250 .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1251
1252 Ok(old_item)
1253 }
1254
1255 pub fn query_items(
1260 &self,
1261 table_name: &str,
1262 pk: &str,
1263 params: &QueryParams,
1264 ) -> Result<Vec<(String, String, String)>> {
1265 let (sql, params_vec) = sql_builders::query_items(table_name, pk, params);
1266 let mut stmt = self.conn.prepare(&sql)?;
1267 let rows = stmt
1268 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1269 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1270 })?
1271 .collect::<std::result::Result<Vec<_>, _>>()?;
1272
1273 Ok(rows)
1274 }
1275
1276 pub fn scan_items(
1281 &self,
1282 table_name: &str,
1283 params: &ScanParams,
1284 ) -> Result<Vec<(String, String, String)>> {
1285 let (sql, params_vec) = sql_builders::scan_items(table_name, params);
1286 let mut stmt = self.conn.prepare(&sql)?;
1287 let rows = stmt
1288 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1289 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1290 })?
1291 .collect::<std::result::Result<Vec<_>, _>>()?;
1292
1293 Ok(rows)
1294 }
1295
1296 pub fn count_items(&self, table_name: &str) -> Result<i64> {
1298 let (sql, params) = sql_builders::count_items(table_name);
1299 let count: i64 =
1300 self.conn
1301 .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1302 row.get(0)
1303 })?;
1304 Ok(count)
1305 }
1306
1307 pub fn db_path(&self) -> Option<String> {
1313 self.conn
1314 .path()
1315 .filter(|p| !p.is_empty())
1316 .map(|p| p.to_owned())
1317 }
1318
1319 pub fn db_size_bytes(&self) -> Result<u64> {
1321 let size: i64 = self.conn.query_row(
1322 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1323 [],
1324 |row| row.get(0),
1325 )?;
1326 Ok(size as u64)
1327 }
1328
1329 pub fn table_count(&self) -> Result<usize> {
1331 let count: i64 = self
1332 .conn
1333 .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1334 Ok(count as usize)
1335 }
1336
1337 pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1341 let table_names = self.list_table_names()?;
1342 let mut stats = Vec::with_capacity(table_names.len());
1343 for name in table_names {
1344 let sql = format!(
1345 "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1346 escape_table_name(&name)
1347 );
1348 let (item_count, size_bytes): (i64, i64) = self
1349 .conn
1350 .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1351 stats.push(TableStats {
1352 table_name: name,
1353 item_count,
1354 size_bytes: size_bytes as u64,
1355 });
1356 }
1357 Ok(stats)
1358 }
1359
1360 pub fn database_info(&self) -> Result<DatabaseInfo> {
1365 let path = self.db_path();
1366 let size_bytes = self.db_size_bytes()?;
1367 let table_count = self.table_count()?;
1368 let stats = self.table_stats()?;
1369
1370 let mut table_details = Vec::with_capacity(stats.len());
1371 for s in stats {
1372 let metadata = self.get_table_metadata(&s.table_name)?;
1373 table_details.push(TableInfoEntry { stats: s, metadata });
1374 }
1375
1376 Ok(DatabaseInfo {
1377 path,
1378 size_bytes,
1379 table_count,
1380 tables: table_details,
1381 })
1382 }
1383
1384 pub fn vacuum_into(&self, path: &str) -> Result<()> {
1391 if path.contains('\0') {
1392 return Err(DynoxideError::ValidationException(
1393 "path contains null byte".to_string(),
1394 ));
1395 }
1396 self.conn
1397 .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1398 Ok(())
1399 }
1400
1401 pub fn vacuum(&self) -> Result<()> {
1403 self.conn.execute_batch("VACUUM")?;
1404 Ok(())
1405 }
1406
1407 pub fn restore_from(&mut self, path: &str) -> Result<()> {
1411 let source = Connection::open(path)?;
1412 self.restore_from_connection(&source)
1413 }
1414
1415 pub fn backup_to_memory(&self) -> Result<Connection> {
1420 let mut dest = Connection::open_in_memory()?;
1421 {
1422 let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1423 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1424 }
1425 Ok(dest)
1426 }
1427
1428 pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1433 let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1434 backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1435 self.metadata_cache.borrow_mut().clear();
1436 Ok(())
1437 }
1438
1439 pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1441 let size: i64 = conn.query_row(
1442 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1443 [],
1444 |row| row.get(0),
1445 )?;
1446 Ok(size as u64)
1447 }
1448
1449 pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1455 self.conn.execute(
1456 "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1457 params![view_type, label, table_name],
1458 )?;
1459 self.metadata_cache.borrow_mut().remove(table_name);
1460 Ok(())
1461 }
1462
1463 pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1465 self.conn.execute(
1466 "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1467 params![table_name],
1468 )?;
1469 self.metadata_cache.borrow_mut().remove(table_name);
1470 Ok(())
1471 }
1472
1473 #[allow(clippy::too_many_arguments)]
1475 pub fn insert_stream_record(
1476 &self,
1477 table_name: &str,
1478 event_name: &str,
1479 keys_json: &str,
1480 new_image: Option<&str>,
1481 old_image: Option<&str>,
1482 sequence_number: &str,
1483 shard_id: &str,
1484 created_at: i64,
1485 ) -> Result<()> {
1486 self.insert_stream_record_with_identity(
1487 table_name,
1488 event_name,
1489 keys_json,
1490 new_image,
1491 old_image,
1492 sequence_number,
1493 shard_id,
1494 created_at,
1495 None,
1496 )
1497 }
1498
1499 #[allow(clippy::too_many_arguments)]
1501 pub fn insert_stream_record_with_identity(
1502 &self,
1503 table_name: &str,
1504 event_name: &str,
1505 keys_json: &str,
1506 new_image: Option<&str>,
1507 old_image: Option<&str>,
1508 sequence_number: &str,
1509 shard_id: &str,
1510 created_at: i64,
1511 user_identity: Option<&str>,
1512 ) -> Result<()> {
1513 self.conn.execute(
1514 "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1515 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1516 params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1517 )?;
1518 Ok(())
1519 }
1520
1521 pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1523 let result: std::result::Result<i64, _> = self.conn.query_row(
1524 "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1525 params![table_name],
1526 |row| row.get(0),
1527 );
1528 match result {
1529 Ok(n) => Ok(n),
1530 Err(_) => Ok(1),
1531 }
1532 }
1533
1534 pub fn get_stream_records(
1536 &self,
1537 table_name: &str,
1538 shard_id: &str,
1539 after_sequence: i64,
1540 limit: usize,
1541 ) -> Result<Vec<StreamRecord>> {
1542 let mut stmt = self.conn.prepare(
1543 "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1544 FROM _stream_records
1545 WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1546 ORDER BY CAST(sequence_number AS INTEGER) ASC
1547 LIMIT ?4",
1548 )?;
1549 let rows = stmt
1550 .query_map(
1551 params![table_name, shard_id, after_sequence, limit as i64],
1552 |row| {
1553 Ok(StreamRecord {
1554 event_name: row.get(0)?,
1555 keys_json: row.get(1)?,
1556 new_image: row.get(2)?,
1557 old_image: row.get(3)?,
1558 sequence_number: row.get(4)?,
1559 created_at: row.get(5)?,
1560 user_identity: row.get(6)?,
1561 })
1562 },
1563 )?
1564 .collect::<std::result::Result<Vec<_>, _>>()?;
1565 Ok(rows)
1566 }
1567
1568 pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1570 let sql = format!(
1571 "SELECT {} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name",
1572 sql_builders::TABLE_METADATA_COLUMNS
1573 );
1574 let mut stmt = self.conn.prepare(&sql)?;
1575 let rows = stmt
1576 .query_map([], row_to_metadata)?
1577 .collect::<std::result::Result<Vec<_>, _>>()?;
1578 Ok(rows)
1579 }
1580
1581 pub fn update_ttl_config(
1587 &self,
1588 table_name: &str,
1589 attribute_name: Option<&str>,
1590 enabled: bool,
1591 ) -> Result<()> {
1592 self.conn.execute(
1593 "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1594 params![attribute_name, enabled as i32, table_name],
1595 )?;
1596 self.metadata_cache.borrow_mut().remove(table_name);
1597 Ok(())
1598 }
1599
1600 pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1602 let sql = format!(
1603 "SELECT {} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name",
1604 sql_builders::TABLE_METADATA_COLUMNS
1605 );
1606 let mut stmt = self.conn.prepare(&sql)?;
1607 let rows = stmt
1608 .query_map([], row_to_metadata)?
1609 .collect::<std::result::Result<Vec<_>, _>>()?;
1610 Ok(rows)
1611 }
1612
1613 pub fn get_shard_sequence_range(
1615 &self,
1616 table_name: &str,
1617 shard_id: &str,
1618 ) -> Result<(Option<String>, Option<String>)> {
1619 let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1620 "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1621 params![table_name, shard_id],
1622 |row| Ok((row.get(0)?, row.get(1)?)),
1623 );
1624 match result {
1625 Ok(range) => Ok(range),
1626 Err(_) => Ok((None, None)),
1627 }
1628 }
1629
1630 pub fn touch_cached_at(
1636 &self,
1637 table_name: &str,
1638 pk: &str,
1639 sk: &str,
1640 timestamp: f64,
1641 ) -> Result<()> {
1642 let sql = format!(
1643 "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1644 escape_table_name(table_name)
1645 );
1646 self.conn.execute(&sql, params![timestamp, pk, sk])?;
1647 Ok(())
1648 }
1649
1650 pub fn get_lru_items(
1655 &self,
1656 table_name: &str,
1657 limit: usize,
1658 ) -> Result<Vec<(String, String, i64)>> {
1659 let sql = format!(
1660 "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1661 escape_table_name(table_name)
1662 );
1663 let mut stmt = self.conn.prepare(&sql)?;
1664 let rows = stmt
1665 .query_map(params![limit as i64], |row| {
1666 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1667 })?
1668 .collect::<std::result::Result<Vec<_>, _>>()?;
1669 Ok(rows)
1670 }
1671}
1672
1673#[derive(Debug, Clone)]
1675pub struct StreamRecord {
1676 pub event_name: String,
1677 pub keys_json: String,
1678 pub new_image: Option<String>,
1679 pub old_image: Option<String>,
1680 pub sequence_number: String,
1681 pub created_at: i64,
1682 pub user_identity: Option<String>,
1683}
1684
1685#[derive(Debug, Clone)]
1687pub struct TableStats {
1688 pub table_name: String,
1689 pub item_count: i64,
1690 pub size_bytes: u64,
1691}
1692
1693#[derive(Debug, Clone)]
1695pub struct DatabaseInfo {
1696 pub path: Option<String>,
1697 pub size_bytes: u64,
1698 pub table_count: usize,
1699 pub tables: Vec<TableInfoEntry>,
1700}
1701
1702#[derive(Debug, Clone)]
1704pub struct TableInfoEntry {
1705 pub stats: TableStats,
1706 pub metadata: Option<TableMetadata>,
1707}
1708
1709#[derive(Debug, Clone)]
1715pub struct TableMetadata {
1716 pub table_name: String,
1717 pub key_schema: String,
1718 pub attribute_definitions: String,
1719 pub gsi_definitions: Option<String>,
1720 pub lsi_definitions: Option<String>,
1721 pub stream_enabled: bool,
1722 pub stream_view_type: Option<String>,
1723 pub stream_label: Option<String>,
1724 pub ttl_attribute: Option<String>,
1725 pub ttl_enabled: bool,
1726 pub created_at: i64,
1727 pub table_status: String,
1728 pub billing_mode: Option<String>,
1729 pub provisioned_throughput: Option<String>,
1730 pub sse_specification: Option<String>,
1731 pub table_class: Option<String>,
1732 pub deletion_protection_enabled: bool,
1733 pub on_demand_throughput: Option<String>,
1734 pub table_id: Option<String>,
1737}
1738
1739#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
1741fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
1742 Ok(TableMetadata {
1743 table_name: row.get(0)?,
1744 key_schema: row.get(1)?,
1745 attribute_definitions: row.get(2)?,
1746 gsi_definitions: row.get(3)?,
1747 lsi_definitions: row.get(4)?,
1748 stream_enabled: row.get::<_, i32>(5)? != 0,
1749 stream_view_type: row.get(6)?,
1750 stream_label: row.get(7)?,
1751 ttl_attribute: row.get(8)?,
1752 ttl_enabled: row.get::<_, i32>(9)? != 0,
1753 created_at: row.get(10)?,
1754 table_status: row.get(11)?,
1755 billing_mode: row.get(12)?,
1756 provisioned_throughput: row.get(13)?,
1757 sse_specification: row.get(14)?,
1758 table_class: row.get(15)?,
1759 deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
1760 on_demand_throughput: row.get(17)?,
1761 table_id: row.get(18)?,
1762 })
1763}
1764
1765#[cfg(all(test, any(feature = "native-sqlite", feature = "_has-encryption")))]
1766mod tests {
1767 use super::*;
1768
1769 fn test_storage() -> Storage {
1770 Storage::memory().expect("Failed to create in-memory storage")
1771 }
1772
1773 #[test]
1774 fn test_initialize_creates_metadata_tables() {
1775 let storage = test_storage();
1776 let version: String = storage
1778 .conn()
1779 .query_row(
1780 "SELECT value FROM _config WHERE key = 'schema_version'",
1781 [],
1782 |row| row.get(0),
1783 )
1784 .unwrap();
1785 assert_eq!(version, SCHEMA_VERSION);
1786 }
1787
1788 #[test]
1789 fn test_migrate_v6_to_v7_adds_on_demand_throughput_column() {
1790 let tmp = tempfile::NamedTempFile::new().unwrap();
1794 let path = tmp.path().to_str().unwrap().to_string();
1795
1796 {
1799 let conn = Connection::open(&path).unwrap();
1800 conn.execute_batch(
1801 "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1802 CREATE TABLE _tables (
1803 table_name TEXT PRIMARY KEY,
1804 key_schema TEXT NOT NULL,
1805 attribute_definitions TEXT NOT NULL,
1806 gsi_definitions TEXT,
1807 lsi_definitions TEXT,
1808 stream_enabled INTEGER DEFAULT 0,
1809 stream_view_type TEXT,
1810 stream_label TEXT,
1811 ttl_attribute TEXT,
1812 ttl_enabled INTEGER DEFAULT 0,
1813 created_at INTEGER NOT NULL,
1814 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1815 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1816 provisioned_throughput TEXT,
1817 tags TEXT,
1818 sse_specification TEXT,
1819 table_class TEXT,
1820 deletion_protection_enabled INTEGER DEFAULT 0
1821 );",
1822 )
1823 .unwrap();
1824 conn.execute(
1825 "INSERT INTO _config (key, value) VALUES ('schema_version', '6')",
1826 [],
1827 )
1828 .unwrap();
1829 conn.execute(
1830 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1831 VALUES ('LegacyTable', ?1, ?2, 0)",
1832 params![
1833 r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1834 r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1835 ],
1836 )
1837 .unwrap();
1838 }
1839
1840 let storage = Storage::new(&path).unwrap();
1843 let version: String = storage
1844 .conn()
1845 .query_row(
1846 "SELECT value FROM _config WHERE key = 'schema_version'",
1847 [],
1848 |r| r.get(0),
1849 )
1850 .unwrap();
1851 assert_eq!(version, SCHEMA_VERSION);
1852
1853 let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1856 assert_eq!(meta.table_name, "LegacyTable");
1857 assert!(meta.on_demand_throughput.is_none());
1858
1859 let col: Option<String> = storage
1861 .conn()
1862 .query_row(
1863 "SELECT on_demand_throughput FROM _tables WHERE table_name = 'LegacyTable'",
1864 [],
1865 |r| r.get(0),
1866 )
1867 .unwrap();
1868 assert!(col.is_none());
1869 }
1870
1871 #[test]
1875 fn test_migrate_v7_to_v8_backfills_table_id() {
1876 let tmp = tempfile::NamedTempFile::new().unwrap();
1877 let path = tmp.path().to_str().unwrap().to_string();
1878
1879 {
1882 let conn = Connection::open(&path).unwrap();
1883 conn.execute_batch(
1884 "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1885 CREATE TABLE _tables (
1886 table_name TEXT PRIMARY KEY,
1887 key_schema TEXT NOT NULL,
1888 attribute_definitions TEXT NOT NULL,
1889 gsi_definitions TEXT,
1890 lsi_definitions TEXT,
1891 stream_enabled INTEGER DEFAULT 0,
1892 stream_view_type TEXT,
1893 stream_label TEXT,
1894 ttl_attribute TEXT,
1895 ttl_enabled INTEGER DEFAULT 0,
1896 created_at INTEGER NOT NULL,
1897 table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1898 billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1899 provisioned_throughput TEXT,
1900 tags TEXT,
1901 sse_specification TEXT,
1902 table_class TEXT,
1903 deletion_protection_enabled INTEGER DEFAULT 0,
1904 on_demand_throughput TEXT
1905 );",
1906 )
1907 .unwrap();
1908 conn.execute(
1909 "INSERT INTO _config (key, value) VALUES ('schema_version', '7')",
1910 [],
1911 )
1912 .unwrap();
1913 conn.execute(
1914 "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1915 VALUES ('LegacyTable', ?1, ?2, 0)",
1916 params![
1917 r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1918 r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1919 ],
1920 )
1921 .unwrap();
1922 }
1923
1924 let storage = Storage::new(&path).unwrap();
1926 let version: String = storage
1927 .conn()
1928 .query_row(
1929 "SELECT value FROM _config WHERE key = 'schema_version'",
1930 [],
1931 |r| r.get(0),
1932 )
1933 .unwrap();
1934 assert_eq!(version, SCHEMA_VERSION);
1935
1936 let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1939 let id = meta.table_id.expect("legacy table should be backfilled");
1940 assert!(!id.is_empty());
1941
1942 drop(storage);
1944 let storage2 = Storage::new(&path).unwrap();
1945 let meta2 = storage2.get_table_metadata("LegacyTable").unwrap().unwrap();
1946 assert_eq!(meta2.table_id.as_deref(), Some(id.as_str()));
1947 }
1948
1949 #[test]
1950 fn test_wal_mode_enabled() {
1951 let storage = test_storage();
1952 let mode: String = storage
1953 .conn()
1954 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
1955 .unwrap();
1956 assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
1958 }
1959
1960 #[test]
1961 fn test_table_metadata_crud() {
1962 let storage = test_storage();
1963
1964 assert!(!storage.table_exists("TestTable").unwrap());
1966 assert!(storage.list_table_names().unwrap().is_empty());
1967
1968 storage
1970 .insert_table_metadata(&CreateTableMetadata {
1971 table_name: "TestTable",
1972 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1973 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1974 created_at: 1000000,
1975 ..Default::default()
1976 })
1977 .unwrap();
1978
1979 assert!(storage.table_exists("TestTable").unwrap());
1980 assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
1981
1982 let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
1984 assert_eq!(meta.table_name, "TestTable");
1985 assert_eq!(meta.table_status, "ACTIVE");
1986 assert_eq!(meta.created_at, 1000000);
1987
1988 assert!(storage.delete_table_metadata("TestTable").unwrap());
1990 assert!(!storage.table_exists("TestTable").unwrap());
1991 }
1992
1993 #[test]
1994 fn test_create_and_drop_data_table() {
1995 let storage = test_storage();
1996 storage.create_data_table("MyTable").unwrap();
1997
1998 storage
2000 .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2001 .unwrap();
2002
2003 let item = storage.get_item("MyTable", "pk1", "").unwrap();
2004 assert!(item.is_some());
2005
2006 storage.drop_data_table("MyTable").unwrap();
2007 }
2008
2009 #[test]
2010 fn test_item_crud() {
2011 let storage = test_storage();
2012 storage.create_data_table("Items").unwrap();
2013
2014 let old = storage
2016 .put_item(
2017 "Items",
2018 "user#1",
2019 "profile",
2020 r#"{"name":{"S":"Alice"}}"#,
2021 20,
2022 )
2023 .unwrap();
2024 assert!(old.is_none()); let item = storage.get_item("Items", "user#1", "profile").unwrap();
2028 assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2029
2030 let old = storage
2032 .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2033 .unwrap();
2034 assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2035
2036 let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2038 assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2039
2040 assert!(
2042 storage
2043 .get_item("Items", "user#1", "profile")
2044 .unwrap()
2045 .is_none()
2046 );
2047 }
2048
2049 #[test]
2050 fn test_query_items() {
2051 let storage = test_storage();
2052 storage.create_data_table("Orders").unwrap();
2053
2054 for i in 1..=5 {
2056 let sk = format!("order#{i:03}");
2057 let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2058 storage
2059 .put_item("Orders", "user#1", &sk, &json, 10)
2060 .unwrap();
2061 }
2062
2063 let results = storage
2065 .query_items(
2066 "Orders",
2067 "user#1",
2068 &QueryParams {
2069 forward: true,
2070 ..Default::default()
2071 },
2072 )
2073 .unwrap();
2074 assert_eq!(results.len(), 5);
2075 assert_eq!(results[0].1, "order#001"); let results = storage
2079 .query_items(
2080 "Orders",
2081 "user#1",
2082 &QueryParams {
2083 forward: true,
2084 limit: Some(2),
2085 ..Default::default()
2086 },
2087 )
2088 .unwrap();
2089 assert_eq!(results.len(), 2);
2090
2091 let results = storage
2093 .query_items(
2094 "Orders",
2095 "user#1",
2096 &QueryParams {
2097 forward: false,
2098 limit: Some(2),
2099 ..Default::default()
2100 },
2101 )
2102 .unwrap();
2103 assert_eq!(results.len(), 2);
2104 assert_eq!(results[0].1, "order#005"); }
2106
2107 #[test]
2108 fn test_scan_items() {
2109 let storage = test_storage();
2110 storage.create_data_table("ScanTest").unwrap();
2111
2112 storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2113 storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2114 storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2115
2116 let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2117 assert_eq!(results.len(), 3);
2118
2119 let results = storage
2121 .scan_items(
2122 "ScanTest",
2123 &ScanParams {
2124 limit: Some(2),
2125 ..Default::default()
2126 },
2127 )
2128 .unwrap();
2129 assert_eq!(results.len(), 2);
2130
2131 let results = storage
2133 .scan_items(
2134 "ScanTest",
2135 &ScanParams {
2136 limit: Some(2),
2137 exclusive_start_pk: Some("a"),
2138 exclusive_start_sk: Some("1"),
2139 ..Default::default()
2140 },
2141 )
2142 .unwrap();
2143 assert_eq!(results.len(), 2);
2144 assert_eq!(results[0].0, "b"); }
2146
2147 #[test]
2148 fn test_count_items() {
2149 let storage = test_storage();
2150 storage.create_data_table("CountTest").unwrap();
2151
2152 assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2153
2154 storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2155 storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2156
2157 assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2158 }
2159
2160 #[test]
2161 fn test_gsi_table_lifecycle() {
2162 let storage = test_storage();
2163 storage.create_gsi_table("Orders", "ByDate").unwrap();
2164
2165 let gsi_name = "Orders::gsi::ByDate";
2167 let sql = format!(
2168 "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2169 gsi_name.replace('"', "\"\"")
2170 );
2171 storage
2172 .conn()
2173 .execute(
2174 &sql,
2175 params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2176 )
2177 .unwrap();
2178
2179 storage.drop_gsi_table("Orders", "ByDate").unwrap();
2180 }
2181
2182 #[test]
2183 fn test_nonexistent_table_metadata() {
2184 let storage = test_storage();
2185 assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2186 assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2187 }
2188
2189 #[test]
2190 fn test_metadata_cache_hit() {
2191 let storage = test_storage();
2192 storage
2193 .insert_table_metadata(&CreateTableMetadata {
2194 table_name: "CacheTest",
2195 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2196 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2197 created_at: 1000000,
2198 ..Default::default()
2199 })
2200 .unwrap();
2201
2202 let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2204 assert_eq!(meta1.table_name, "CacheTest");
2205
2206 let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2208 assert_eq!(meta2.table_name, "CacheTest");
2209 assert_eq!(meta1.created_at, meta2.created_at);
2210
2211 assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2213 }
2214
2215 #[test]
2216 fn test_metadata_cache_invalidated_on_delete() {
2217 let storage = test_storage();
2218 storage
2219 .insert_table_metadata(&CreateTableMetadata {
2220 table_name: "DelCache",
2221 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2222 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2223 created_at: 1000000,
2224 ..Default::default()
2225 })
2226 .unwrap();
2227
2228 storage.get_table_metadata("DelCache").unwrap();
2230 assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2231
2232 storage.delete_table_metadata("DelCache").unwrap();
2234 assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2235 }
2236
2237 #[test]
2238 fn test_metadata_cache_invalidated_on_stream_enable() {
2239 let storage = test_storage();
2240 storage
2241 .insert_table_metadata(&CreateTableMetadata {
2242 table_name: "StreamCache",
2243 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2244 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2245 created_at: 1000000,
2246 ..Default::default()
2247 })
2248 .unwrap();
2249
2250 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2252 assert!(!meta.stream_enabled);
2253
2254 storage
2256 .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2257 .unwrap();
2258 assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2259
2260 let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2262 assert!(meta.stream_enabled);
2263 }
2264
2265 #[test]
2266 fn test_metadata_cache_invalidated_on_ttl_update() {
2267 let storage = test_storage();
2268 storage
2269 .insert_table_metadata(&CreateTableMetadata {
2270 table_name: "TtlCache",
2271 key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2272 attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2273 created_at: 1000000,
2274 ..Default::default()
2275 })
2276 .unwrap();
2277
2278 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2280 assert!(!meta.ttl_enabled);
2281
2282 storage
2284 .update_ttl_config("TtlCache", Some("expires_at"), true)
2285 .unwrap();
2286 assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2287
2288 let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2290 assert!(meta.ttl_enabled);
2291 assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2292 }
2293
2294 #[test]
2295 fn test_num_to_buffer_zero() {
2296 assert_eq!(num_to_buffer("0"), vec![0x80]);
2298 assert_eq!(num_to_buffer("-0"), vec![0x80]);
2299 }
2300
2301 #[test]
2302 fn test_hash_prefix_string_keys() {
2303 let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2306 let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2307 let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2308 let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2309
2310 assert_eq!(
2313 hash_bucket(&h1),
2314 0,
2315 "3635 should be bucket 0, got hash {h1}"
2316 );
2317 assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2318
2319 assert_eq!(
2321 hash_bucket(&h3),
2322 1,
2323 "1668 should be bucket 1, got hash {h3}"
2324 );
2325
2326 assert_eq!(
2328 hash_bucket(&h4),
2329 4,
2330 "3435 should be bucket 4, got hash {h4}"
2331 );
2332 }
2333
2334 #[test]
2335 fn test_hash_prefix_number_keys() {
2336 let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2339 assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2340
2341 let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2343 assert_eq!(
2344 hash_bucket(&h2),
2345 4095,
2346 "2388 should be bucket 4095, got hash {h2}"
2347 );
2348 }
2349
2350 #[test]
2351 fn test_hash_in_segment() {
2352 assert!(hash_in_segment("000000", 0, 4096));
2354 assert!(!hash_in_segment("000000", 1, 4096));
2355
2356 assert!(hash_in_segment("001000", 1, 4096));
2358 assert!(!hash_in_segment("001000", 0, 4096));
2359
2360 assert!(hash_in_segment("fff000", 4095, 4096));
2362 assert!(!hash_in_segment("fff000", 0, 4096));
2363
2364 assert!(hash_in_segment("000000", 0, 2));
2366 assert!(hash_in_segment("7ff000", 0, 2));
2367 assert!(hash_in_segment("800000", 1, 2));
2368 assert!(hash_in_segment("fff000", 1, 2));
2369 }
2370}