Skip to main content

dynoxide/
storage.rs

1#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
2use crate::errors::{DynoxideError, Result};
3#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
4use crate::storage_backend::clock::{Clock, SystemClock};
5#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
6use crate::storage_backend::sql_builders::{self, escape_table_name};
7use crate::types::AttributeValue;
8#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
9use rusqlite::{Connection, params};
10#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
11use std::{cell::RefCell, collections::HashMap, sync::Arc};
12
13/// Current schema version. Stored in the `_config` table for future migrations.
14#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
15const SCHEMA_VERSION: &str = "8";
16
17/// Number of hash buckets used for parallel scan segment assignment.
18/// Matches dynalite's implementation.
19pub(crate) const HASH_BUCKETS: u32 = 4096;
20
21// ---------------------------------------------------------------------------
22// MD5-based hash prefix for parallel scan (dynalite-compatible)
23// ---------------------------------------------------------------------------
24
25/// Compute the 6-character hex hash prefix for a partition key value.
26///
27/// Uses `MD5("Outliers" + key_bytes)` where `key_bytes` depends on the
28/// attribute type:
29/// - `S`: UTF-8 bytes of the string
30/// - `N`: Oracle packed BCD encoding via [`num_to_buffer`]
31/// - `B`: raw binary bytes
32///
33/// This matches dynalite's `hashPrefix` function.
34pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
35    let key_bytes = match pk_value {
36        AttributeValue::S(s) => s.as_bytes().to_vec(),
37        AttributeValue::N(n) => num_to_buffer(n),
38        AttributeValue::B(b) => b.clone(),
39        _ => vec![], // Should not happen for valid keys
40    };
41
42    let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
43    format!("{:032x}", digest)[..6].to_string()
44}
45
46/// Compute the hash bucket (0..4095) from a 6-character hex hash prefix.
47pub fn hash_bucket(hash_prefix: &str) -> u32 {
48    let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
49    u32::from_str_radix(prefix_3, 16).unwrap_or(0)
50}
51
52/// Convert a DynamoDB number string to Oracle-style packed BCD bytes.
53///
54/// Faithfully ports dynalite's `numToBuffer` function from `db/index.js`.
55/// Uses Big.js semantics: `num.s` (sign), `num.c` (coefficient digits),
56/// `num.e` (exponent = position of first digit relative to decimal point - 1).
57fn num_to_buffer(num_str: &str) -> Vec<u8> {
58    let trimmed = num_str.trim();
59    if trimmed.is_empty() {
60        return vec![0x80];
61    }
62
63    use bigdecimal::BigDecimal;
64    use std::str::FromStr;
65
66    let bd = match BigDecimal::from_str(trimmed) {
67        Ok(v) => v,
68        Err(_) => return vec![0x80],
69    };
70
71    if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
72        return vec![0x80];
73    }
74
75    let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
76    let bd_abs = if is_negative { -&bd } else { bd.clone() };
77
78    let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
79    if mantissa.is_empty() {
80        return vec![0x80];
81    }
82
83    // JS: scale = num.s (-1 for negative, 1 for positive)
84    // JS: appendZero = exponent % 2 ? 1 : 0
85    let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
86    let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
87
88    let mut byte_array: Vec<u8>;
89    if byte_len_no_exp < 20 && is_negative {
90        byte_array = vec![0u8; byte_len_no_exp + 2];
91        byte_array[byte_len_no_exp + 1] = 102;
92    } else {
93        byte_array = vec![0u8; byte_len_no_exp + 1];
94    }
95
96    // byteArray[0] = Math.floor((exponent + appendZero) / 2) - 64
97    // For negative exponents, JS Math.floor(-3/2) = -2, matching Rust integer division
98    // for negative values we need floor division, not truncation.
99    let exp_sum = exponent + append_zero;
100    let exp_byte_val = floor_div(exp_sum, 2) - 64;
101    if is_negative {
102        // byteArray[0] ^= 0xffffffff — JS bitwise XOR on a number
103        // This effectively does a bitwise NOT on the low byte
104        byte_array[0] = (exp_byte_val ^ !0i64) as u8;
105    } else {
106        byte_array[0] = exp_byte_val as u8;
107    }
108
109    // The main loop faithfully mirrors the JS for loop.
110    // JS uses mantissaIndex as a signed int that can be decremented to -1.
111    let mut mi: i64 = 0; // mantissaIndex (signed to allow -1)
112    let mlen = mantissa.len() as i64;
113    let mut appended_zero = false;
114
115    while mi < mlen {
116        let bai = ((mi + append_zero) / 2 + 1) as usize; // byteArrayIndex
117        if append_zero != 0 && mi == 0 && !appended_zero {
118            byte_array[bai] = 0;
119            appended_zero = true;
120            mi -= 1; // JS: mantissaIndex--
121        } else if (mi + append_zero) % 2 == 0 {
122            byte_array[bai] = mantissa[mi as usize] * 10;
123        } else {
124            byte_array[bai] += mantissa[mi as usize];
125        }
126
127        // Finalise byte: if odd position or last mantissa digit
128        if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
129            if is_negative {
130                byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
131            } else {
132                byte_array[bai] = byte_array[bai].wrapping_add(1);
133            }
134        }
135
136        mi += 1; // JS: for loop increment
137    }
138
139    byte_array
140}
141
142/// Floor division for signed integers (matching JS Math.floor for division).
143fn floor_div(a: i64, b: i64) -> i64 {
144    let d = a / b;
145    let r = a % b;
146    if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
147}
148
149/// Extract mantissa digits and exponent from a BigDecimal.
150///
151/// Returns (digits, exponent) matching Big.js semantics where:
152/// - digits: array of individual digits [2, 5, 1] for "251"
153/// - exponent: number of digits before the decimal point
154///   e.g., "251" → exponent=3, "0.012345" → exponent=-1
155fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
156    // Normalize to remove trailing zeros
157    let normalized = bd.normalized();
158
159    // Get the string representation without scientific notation
160    // We need the digits and the position of the decimal point
161    let (bigint, scale) = normalized.as_bigint_and_exponent();
162    let digits_str = bigint.to_string();
163    let digits_str = digits_str.trim_start_matches('-');
164
165    let digits: Vec<u8> = digits_str
166        .chars()
167        .map(|c| c.to_digit(10).unwrap() as u8)
168        .collect();
169
170    // scale = number of digits after decimal point in the representation
171    // exponent (Big.js style) = number_of_digits - scale = digits.len() as i64 - scale
172    let exponent = digits.len() as i64 - scale;
173
174    (digits, exponent)
175}
176
177/// Check whether a hash_prefix falls within the range for a given segment.
178///
179/// Uses dynalite's 4096-bucket scheme:
180/// - bucket = parseInt(hash_prefix[0..3], 16)
181/// - segment owns buckets from ceil(4096 * segment / total) to
182///   ceil(4096 * (segment+1) / total) - 1
183pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
184    let bucket = hash_bucket(hash_prefix);
185    let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
186    let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
187    bucket >= start && bucket <= end
188}
189
190pub(crate) fn ceiling_div(a: u32, b: u32) -> u32 {
191    a.div_ceil(b)
192}
193
194/// Parameters for scan operations (base table or GSI).
195#[derive(Debug, Default)]
196pub struct ScanParams<'a> {
197    pub limit: Option<usize>,
198    pub exclusive_start_pk: Option<&'a str>,
199    pub exclusive_start_sk: Option<&'a str>,
200    pub segment: Option<u32>,
201    pub total_segments: Option<u32>,
202    /// For LSI pagination: base table PK for composite cursor.
203    pub exclusive_start_base_pk: Option<&'a str>,
204    /// For LSI pagination: base table SK for composite cursor.
205    pub exclusive_start_base_sk: Option<&'a str>,
206}
207
208/// Parameters for inserting table metadata.
209#[derive(Debug, Default)]
210pub struct CreateTableMetadata<'a> {
211    pub table_name: &'a str,
212    pub key_schema: &'a str,
213    pub attribute_definitions: &'a str,
214    pub gsi_definitions: Option<&'a str>,
215    pub lsi_definitions: Option<&'a str>,
216    pub provisioned_throughput: Option<&'a str>,
217    pub created_at: i64,
218    pub sse_specification: Option<&'a str>,
219    pub table_class: Option<&'a str>,
220    pub deletion_protection_enabled: bool,
221    pub billing_mode: Option<&'a str>,
222    pub on_demand_throughput: Option<&'a str>,
223}
224
225/// Parameters for query operations (base table or GSI).
226#[derive(Debug, Default)]
227pub struct QueryParams<'a> {
228    pub sk_condition: Option<&'a str>,
229    pub sk_params: &'a [&'a str],
230    pub forward: bool,
231    pub limit: Option<usize>,
232    pub exclusive_start_sk: Option<&'a str>,
233    /// For LSI pagination: base table PK for composite cursor.
234    pub exclusive_start_base_pk: Option<&'a str>,
235    /// For LSI pagination: base table SK for composite cursor.
236    pub exclusive_start_base_sk: Option<&'a str>,
237}
238
239/// Low-level SQLite storage layer.
240///
241/// Manages the SQLite connection, metadata tables, and per-DynamoDB-table
242/// data tables. All SQL lives here — higher layers work with Rust types.
243///
244/// Native-only: this type is the rusqlite-backed backend and is compiled out
245/// of backend-neutral builds (for example `wasm-sqlite`).
246#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
247pub struct Storage {
248    conn: Connection,
249    /// In-memory cache of table metadata to avoid repeated SQLite reads.
250    /// Safe to use `RefCell` because `Storage` is always behind `Arc<Mutex<>>`.
251    metadata_cache: RefCell<HashMap<String, TableMetadata>>,
252    /// Wall-clock used by stream / TTL paths. Defaults to [`SystemClock`].
253    clock: Arc<dyn Clock>,
254}
255
256#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
257impl Storage {
258    /// Open a persistent database at the given path.
259    pub fn new(path: &str) -> Result<Self> {
260        let conn = Connection::open(path)?;
261        let mut storage = Self {
262            conn,
263            metadata_cache: RefCell::new(HashMap::new()),
264            clock: Arc::new(SystemClock),
265        };
266        storage.initialize().map_err(Self::maybe_encrypted_error)?;
267        Ok(storage)
268    }
269
270    /// Replace the [`Clock`] used by the stream and TTL paths. Returns `self`
271    /// so callers can chain construction (`Storage::memory()?.with_clock(c)`).
272    ///
273    /// Default for every constructor is [`SystemClock`]. Tests use this to
274    /// inject a `ManualClock`.
275    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
276        self.clock = clock;
277        self
278    }
279
280    /// Borrow the active [`Clock`]. Used by stream and TTL paths.
281    pub(crate) fn clock(&self) -> &dyn Clock {
282        self.clock.as_ref()
283    }
284
285    /// If a SQLite error is SQLITE_NOTADB, return a clearer error message
286    /// suggesting the database may be encrypted.
287    fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
288        if let DynoxideError::SqliteError(ref sqlite_err) = err {
289            if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
290                return DynoxideError::InternalServerError(
291                    "Database file is encrypted or not a valid SQLite database. \
292                     If encrypted, enable the `encryption` or `encryption-cc` feature \
293                     and use Database::new_encrypted() with the correct key."
294                        .to_string(),
295                );
296            }
297        }
298        err
299    }
300
301    /// Open or create an encrypted persistent database at the given path.
302    ///
303    /// The key is passed to SQLCipher via `PRAGMA key`. The database file is
304    /// encrypted at rest using AES-256-CBC. A database opened without calling
305    /// `PRAGMA key` is treated as a normal unencrypted database by SQLCipher.
306    #[cfg(feature = "_has-encryption")]
307    pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
308        use zeroize::Zeroize;
309
310        let conn = Connection::open(path)?;
311        // Safety: key is validated to be exactly 64 hex characters [0-9a-fA-F]
312        // by Database::new_encrypted(), so no injection is possible in the
313        // x'...' hex literal format. Note: pragma_update does NOT use parameter
314        // binding for the PRAGMA value — hex validation is the sole injection defense.
315        let mut pragma_val = format!("x'{key}'");
316        conn.pragma_update(None, "key", &pragma_val)?;
317        pragma_val.zeroize();
318        // Verify the key works by reading from the database
319        conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
320        let mut storage = Self {
321            conn,
322            metadata_cache: RefCell::new(HashMap::new()),
323            clock: Arc::new(SystemClock),
324        };
325        storage.initialize()?;
326        Ok(storage)
327    }
328
329    /// Open an in-memory database (for tests and ephemeral use).
330    pub fn memory() -> Result<Self> {
331        let conn = Connection::open_in_memory()?;
332        let mut storage = Self {
333            conn,
334            metadata_cache: RefCell::new(HashMap::new()),
335            clock: Arc::new(SystemClock),
336        };
337        storage.initialize()?;
338        Ok(storage)
339    }
340
341    /// Initialize the database: WAL mode, metadata tables, config.
342    fn initialize(&mut self) -> Result<()> {
343        // Enable WAL mode for better concurrency
344        self.conn.pragma_update(None, "journal_mode", "WAL")?;
345
346        // Register FNV-1a hash function for parallel scan segment assignment.
347        // Uses get_raw() to borrow directly from SQLite's buffer (zero-copy).
348        self.conn.create_scalar_function(
349            "fnv1a_hash",
350            1,
351            rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
352                | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
353            |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
354                let pk_ref = ctx.get_raw(0);
355                let pk_bytes = match pk_ref {
356                    rusqlite::types::ValueRef::Text(bytes) => bytes,
357                    _ => {
358                        return Err(rusqlite::Error::InvalidFunctionParameterType(
359                            0,
360                            rusqlite::types::Type::Text,
361                        ));
362                    }
363                };
364                let mut hash: u32 = 2166136261;
365                for &byte in pk_bytes {
366                    hash ^= byte as u32;
367                    hash = hash.wrapping_mul(16777619);
368                }
369                Ok(hash as i64)
370            },
371        )?;
372
373        // Create metadata tables (schema shared with the wasm backend).
374        self.conn.execute_batch(sql_builders::INIT_SCHEMA)?;
375
376        // Migrate: add user_identity column if it doesn't exist (for databases created before Phase 11)
377        let _ = self
378            .conn
379            .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
380
381        // Set schema version if not present
382        self.conn.execute(
383            "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
384            params![SCHEMA_VERSION],
385        )?;
386
387        // Run schema migrations
388        let version: i32 = self
389            .conn
390            .query_row(
391                "SELECT value FROM _config WHERE key = 'schema_version'",
392                [],
393                |r| r.get::<_, String>(0),
394            )
395            .unwrap_or_else(|_| "1".to_string())
396            .parse()
397            .unwrap_or(1);
398
399        if version < 2 {
400            self.migrate_v1_to_v2()?;
401        }
402        if version < 3 {
403            self.migrate_v2_to_v3()?;
404        }
405        if version < 4 {
406            self.migrate_v3_to_v4()?;
407        }
408        if version < 5 {
409            self.migrate_v4_to_v5()?;
410        }
411        if version < 6 {
412            self.migrate_v5_to_v6()?;
413        }
414        if version < 7 {
415            self.migrate_v6_to_v7()?;
416        }
417        if version < 8 {
418            self.migrate_v7_to_v8()?;
419        }
420
421        Ok(())
422    }
423
424    /// Migrate from schema v1 to v2: add `cached_at REAL` column to all data tables.
425    fn migrate_v1_to_v2(&self) -> Result<()> {
426        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427        let table_names: Vec<String> = stmt
428            .query_map([], |row| row.get(0))?
429            .collect::<std::result::Result<Vec<_>, _>>()?;
430
431        for table_name in &table_names {
432            let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433            let _ = self.conn.execute(
434                &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435                [],
436            );
437        }
438
439        self.conn.execute(
440            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441            [],
442        )?;
443
444        Ok(())
445    }
446
447    /// Migrate from schema v2 to v3: add `tags TEXT` column to `_tables`.
448    fn migrate_v2_to_v3(&self) -> Result<()> {
449        let _ = self
450            .conn
451            .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453        self.conn.execute(
454            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455            [],
456        )?;
457
458        Ok(())
459    }
460
461    /// Migrate from schema v3 to v4: add SSE, table class, and deletion protection columns.
462    fn migrate_v3_to_v4(&self) -> Result<()> {
463        let _ = self
464            .conn
465            .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466        let _ = self
467            .conn
468            .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469        let _ = self.conn.execute(
470            "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471            [],
472        );
473
474        self.conn.execute(
475            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476            [],
477        )?;
478
479        Ok(())
480    }
481
482    /// Migrate from schema v4 to v5: add secondary indexes on GSI and LSI base-key columns.
483    fn migrate_v4_to_v5(&self) -> Result<()> {
484        let mut stmt = self
485            .conn
486            .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487        let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489            .collect::<std::result::Result<Vec<_>, _>>()?;
490
491        for (table_name, gsi_json, lsi_json) in &tables {
492            if let Some(json) = gsi_json {
493                if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494                    for gsi in &gsis {
495                        if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496                            let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497                            let idx_name =
498                                escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499                            let _ = self.conn.execute_batch(&format!(
500                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501                            ));
502                        }
503                    }
504                }
505            }
506            if let Some(json) = lsi_json {
507                if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508                    for lsi in &lsis {
509                        if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510                            let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511                            let idx_name =
512                                escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513                            let _ = self.conn.execute_batch(&format!(
514                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515                            ));
516                        }
517                    }
518                }
519            }
520        }
521
522        self.conn.execute(
523            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524            [],
525        )?;
526
527        Ok(())
528    }
529
530    /// Migrate from schema v5 to v6: add `hash_prefix TEXT` column to all data tables.
531    fn migrate_v5_to_v6(&self) -> Result<()> {
532        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533        let table_names: Vec<String> = stmt
534            .query_map([], |row| row.get(0))?
535            .collect::<std::result::Result<Vec<_>, _>>()?;
536
537        for table_name in &table_names {
538            let escaped = escape_table_name(table_name);
539            let _ = self.conn.execute(
540                &format!(
541                    "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542                ),
543                [],
544            );
545        }
546
547        self.conn.execute(
548            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549            [],
550        )?;
551
552        Ok(())
553    }
554
555    /// Migrate from schema v6 to v7: add `on_demand_throughput TEXT` column to `_tables`.
556    fn migrate_v6_to_v7(&self) -> Result<()> {
557        let _ = self.conn.execute(
558            "ALTER TABLE _tables ADD COLUMN on_demand_throughput TEXT",
559            [],
560        );
561
562        self.conn.execute(
563            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '7')",
564            [],
565        )?;
566
567        Ok(())
568    }
569
570    /// Migrate from schema v7 to v8: add a `table_id TEXT` column to `_tables`
571    /// and backfill existing tables with a one-time random UUID.
572    ///
573    /// TableId must stay stable between reads of the same table (#55). New
574    /// tables get their id at insert time; tables created before this column
575    /// existed are assigned a persisted id here, once. The backfill `SELECT`
576    /// references `table_id`, so if the `ALTER` did not actually add the column
577    /// (a genuine failure, swallowed below for the duplicate-column re-run case)
578    /// this step errors out before the version is stamped, rather than stamping
579    /// a half-applied migration.
580    fn migrate_v7_to_v8(&self) -> Result<()> {
581        let _ = self
582            .conn
583            .execute("ALTER TABLE _tables ADD COLUMN table_id TEXT", []);
584
585        let names: Vec<String> = {
586            let mut stmt = self
587                .conn
588                .prepare("SELECT table_name FROM _tables WHERE table_id IS NULL")?;
589            let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
590            rows.collect::<rusqlite::Result<Vec<String>>>()?
591        };
592        for name in names {
593            let id = uuid::Uuid::new_v4().to_string();
594            self.conn.execute(
595                "UPDATE _tables SET table_id = ?1 WHERE table_name = ?2",
596                params![id, name],
597            )?;
598        }
599
600        self.conn.execute(
601            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '8')",
602            [],
603        )?;
604
605        Ok(())
606    }
607
608    /// Get a reference to the underlying connection (for transactions, etc.).
609    pub fn conn(&self) -> &Connection {
610        &self.conn
611    }
612
613    /// Get a mutable reference to the underlying connection.
614    pub fn conn_mut(&mut self) -> &mut Connection {
615        &mut self.conn
616    }
617
618    // -----------------------------------------------------------------------
619    // Table metadata
620    // -----------------------------------------------------------------------
621
622    /// Insert a row into the `_tables` metadata table.
623    pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
624        let table_name = m.table_name;
625        let (sql, params) = sql_builders::insert_table_metadata(m);
626        self.conn
627            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
628        self.metadata_cache.borrow_mut().remove(table_name);
629        Ok(())
630    }
631
632    /// Get metadata for a table. Returns None if the table doesn't exist.
633    ///
634    /// Results are cached in memory. The cache is invalidated when metadata
635    /// is modified via `insert_table_metadata`, `delete_table_metadata`,
636    /// `enable_stream`, or `update_ttl_config`.
637    pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
638        // Check cache first
639        if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
640            return Ok(Some(cached.clone()));
641        }
642
643        let (sql, params) = sql_builders::get_table_metadata(table_name);
644        let mut stmt = self.conn.prepare(&sql)?;
645
646        let result = stmt.query_row(rusqlite::params_from_iter(params.iter()), row_to_metadata);
647
648        match result {
649            Ok(meta) => {
650                self.metadata_cache
651                    .borrow_mut()
652                    .insert(table_name.to_string(), meta.clone());
653                Ok(Some(meta))
654            }
655            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
656            Err(e) => Err(DynoxideError::from(e)),
657        }
658    }
659
660    /// Delete metadata for a table.
661    pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
662        let (sql, params) = sql_builders::delete_table_metadata(table_name);
663        let affected = self
664            .conn
665            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
666        self.metadata_cache.borrow_mut().remove(table_name);
667        Ok(affected > 0)
668    }
669
670    /// Update attribute definitions and GSI definitions for a table.
671    pub fn update_table_metadata(
672        &self,
673        table_name: &str,
674        attribute_definitions: &str,
675        gsi_definitions: Option<&str>,
676    ) -> Result<()> {
677        let (sql, params) =
678            sql_builders::update_table_metadata(table_name, attribute_definitions, gsi_definitions);
679        self.conn
680            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
681        self.metadata_cache.borrow_mut().remove(table_name);
682        Ok(())
683    }
684
685    /// Update provisioned throughput for a table.
686    pub fn update_provisioned_throughput(
687        &self,
688        table_name: &str,
689        provisioned_throughput: &str,
690    ) -> Result<()> {
691        let (sql, params) =
692            sql_builders::update_provisioned_throughput(table_name, provisioned_throughput);
693        self.conn
694            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
695        self.metadata_cache.borrow_mut().remove(table_name);
696        Ok(())
697    }
698
699    /// Clear provisioned throughput for a table (sets to SQL NULL).
700    pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
701        let (sql, params) = sql_builders::clear_provisioned_throughput(table_name);
702        self.conn
703            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
704        self.metadata_cache.borrow_mut().remove(table_name);
705        Ok(())
706    }
707
708    /// Update billing mode for a table.
709    pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
710        let (sql, params) = sql_builders::update_billing_mode(table_name, billing_mode);
711        self.conn
712            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
713        self.metadata_cache.borrow_mut().remove(table_name);
714        Ok(())
715    }
716
717    /// Update the table class for a table.
718    pub fn update_table_class(&self, table_name: &str, table_class: &str) -> Result<()> {
719        let (sql, params) = sql_builders::update_table_class(table_name, table_class);
720        self.conn
721            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
722        self.metadata_cache.borrow_mut().remove(table_name);
723        Ok(())
724    }
725
726    /// Update the on-demand throughput (stored as JSON) for a table.
727    pub fn update_on_demand_throughput(
728        &self,
729        table_name: &str,
730        on_demand_throughput: &str,
731    ) -> Result<()> {
732        let (sql, params) =
733            sql_builders::update_on_demand_throughput(table_name, on_demand_throughput);
734        self.conn
735            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
736        self.metadata_cache.borrow_mut().remove(table_name);
737        Ok(())
738    }
739
740    // -----------------------------------------------------------------------
741    // Tag operations
742    // -----------------------------------------------------------------------
743
744    /// Get tags for a table.
745    pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
746        let tags_json: Option<String> = self.conn.query_row(
747            "SELECT tags FROM _tables WHERE table_name = ?1",
748            params![table_name],
749            |row| row.get(0),
750        )?;
751
752        match tags_json {
753            Some(json) => serde_json::from_str(&json)
754                .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
755            None => Ok(Vec::new()),
756        }
757    }
758
759    /// Set (merge) tags on a table. New keys overwrite existing keys.
760    pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
761        use std::collections::BTreeMap;
762
763        let existing = self.get_tags(table_name)?;
764        let mut tag_map: BTreeMap<String, String> =
765            existing.into_iter().map(|t| (t.key, t.value)).collect();
766
767        for tag in new_tags {
768            tag_map.insert(tag.key.clone(), tag.value.clone());
769        }
770
771        if tag_map.len() > 50 {
772            return Err(DynoxideError::ValidationException(
773                "One or more parameter values were invalid: \
774                 Too many tags: tag limit is 50"
775                    .to_string(),
776            ));
777        }
778
779        let merged: Vec<crate::types::Tag> = tag_map
780            .into_iter()
781            .map(|(k, v)| crate::types::Tag { key: k, value: v })
782            .collect();
783
784        let json = serde_json::to_string(&merged)
785            .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
786
787        self.conn.execute(
788            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
789            params![json, table_name],
790        )?;
791        Ok(())
792    }
793
794    /// Update the deletion protection setting for a table.
795    pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
796        let (sql, params) = sql_builders::update_deletion_protection(table_name, enabled);
797        self.conn
798            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
799        self.metadata_cache.borrow_mut().remove(table_name);
800        Ok(())
801    }
802
803    /// Remove tags by key from a table.
804    pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
805        let mut tags = self.get_tags(table_name)?;
806        tags.retain(|t| !keys.contains(&t.key));
807
808        let json = if tags.is_empty() {
809            None
810        } else {
811            Some(
812                serde_json::to_string(&tags)
813                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
814            )
815        };
816
817        self.conn.execute(
818            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
819            params![json, table_name],
820        )?;
821        Ok(())
822    }
823
824    /// List all table names.
825    pub fn list_table_names(&self) -> Result<Vec<String>> {
826        let (sql, params) = sql_builders::list_table_names();
827        let mut stmt = self.conn.prepare(&sql)?;
828        let names = stmt
829            .query_map(rusqlite::params_from_iter(params.iter()), |row| row.get(0))?
830            .collect::<std::result::Result<Vec<String>, _>>()?;
831        Ok(names)
832    }
833
834    /// Check if a table exists in metadata.
835    pub fn table_exists(&self, table_name: &str) -> Result<bool> {
836        let (sql, params) = sql_builders::table_exists(table_name);
837        let count: i32 =
838            self.conn
839                .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
840                    row.get(0)
841                })?;
842        Ok(count > 0)
843    }
844
845    /// Invalidate the cached metadata for a specific table.
846    #[allow(dead_code)]
847    pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
848        self.metadata_cache.borrow_mut().remove(table_name);
849    }
850
851    // -----------------------------------------------------------------------
852    // Dynamic data tables
853    // -----------------------------------------------------------------------
854
855    /// Create a data table for a DynamoDB table.
856    pub fn create_data_table(&self, table_name: &str) -> Result<()> {
857        let (sql, params) = sql_builders::create_data_table(table_name);
858        self.conn
859            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
860        Ok(())
861    }
862
863    /// Drop a data table.
864    pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
865        let (sql, params) = sql_builders::drop_data_table(table_name);
866        self.conn
867            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
868        Ok(())
869    }
870
871    /// Create a GSI table.
872    pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
873        let (sql, _) = sql_builders::create_gsi_table(table_name, index_name);
874        self.conn.execute_batch(&sql)?;
875        Ok(())
876    }
877
878    /// Drop a GSI table.
879    pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
880        let (sql, params) = sql_builders::drop_gsi_table(table_name, index_name);
881        self.conn
882            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
883        Ok(())
884    }
885
886    // -----------------------------------------------------------------------
887    // GSI item operations
888    // -----------------------------------------------------------------------
889
890    /// Insert an item into a GSI table.
891    #[allow(clippy::too_many_arguments)]
892    pub fn insert_gsi_item(
893        &self,
894        table_name: &str,
895        index_name: &str,
896        gsi_pk: &str,
897        gsi_sk: &str,
898        table_pk: &str,
899        table_sk: &str,
900        item_json: &str,
901    ) -> Result<()> {
902        let sql = sql_builders::gsi_insert_sql(table_name, index_name);
903        let params = sql_builders::gsi_insert_params(gsi_pk, gsi_sk, table_pk, table_sk, item_json);
904        self.conn
905            .prepare_cached(&sql)?
906            .execute(rusqlite::params_from_iter(params.iter()))?;
907        Ok(())
908    }
909
910    /// Bulk-insert many rows into one GSI table using a single cached
911    /// prepared statement. Equivalent to calling [`Self::insert_gsi_item`]
912    /// once per row, but reuses the statement across the batch.
913    pub fn insert_gsi_items(
914        &self,
915        table_name: &str,
916        index_name: &str,
917        rows: &[crate::storage_backend::GsiItemRow],
918    ) -> Result<()> {
919        let sql = sql_builders::gsi_insert_sql(table_name, index_name);
920        let mut stmt = self.conn.prepare_cached(&sql)?;
921        for row in rows {
922            let params = sql_builders::gsi_insert_params(
923                &row.gsi_pk,
924                &row.gsi_sk,
925                &row.table_pk,
926                &row.table_sk,
927                &row.item_json,
928            );
929            stmt.execute(rusqlite::params_from_iter(params.iter()))?;
930        }
931        Ok(())
932    }
933
934    /// Delete an item from a GSI table by base table primary key.
935    pub fn delete_gsi_item(
936        &self,
937        table_name: &str,
938        index_name: &str,
939        table_pk: &str,
940        table_sk: &str,
941    ) -> Result<()> {
942        let (sql, params) =
943            sql_builders::delete_gsi_item(table_name, index_name, table_pk, table_sk);
944        self.conn
945            .prepare_cached(&sql)?
946            .execute(rusqlite::params_from_iter(params.iter()))?;
947        Ok(())
948    }
949
950    /// Query items from a GSI table.
951    pub fn query_gsi_items(
952        &self,
953        table_name: &str,
954        index_name: &str,
955        gsi_pk: &str,
956        params: &QueryParams,
957    ) -> Result<Vec<(String, String, String)>> {
958        let (sql, params_vec) =
959            sql_builders::query_gsi_items(table_name, index_name, gsi_pk, params);
960        let mut stmt = self.conn.prepare(&sql)?;
961        let rows = stmt
962            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
963                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
964            })?
965            .collect::<std::result::Result<Vec<_>, _>>()?;
966
967        Ok(rows)
968    }
969
970    /// Scan all items from a GSI table.
971    pub fn scan_gsi_items(
972        &self,
973        table_name: &str,
974        index_name: &str,
975        params: &ScanParams,
976    ) -> Result<Vec<(String, String, String)>> {
977        let (sql, params_vec) = sql_builders::scan_gsi_items(table_name, index_name, params);
978        let mut stmt = self.conn.prepare(&sql)?;
979        let rows = stmt
980            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
981                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
982            })?
983            .collect::<std::result::Result<Vec<_>, _>>()?;
984
985        Ok(rows)
986    }
987
988    // -----------------------------------------------------------------------
989    // LSI table operations
990    // -----------------------------------------------------------------------
991
992    /// Create an LSI table for a given base table and index name.
993    pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
994        let (sql, _) = sql_builders::create_lsi_table(table_name, index_name);
995        self.conn.execute_batch(&sql)?;
996        Ok(())
997    }
998
999    /// Drop an LSI table.
1000    pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1001        let (sql, params) = sql_builders::drop_lsi_table(table_name, index_name);
1002        self.conn
1003            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1004        Ok(())
1005    }
1006
1007    // -----------------------------------------------------------------------
1008    // LSI item operations
1009    // -----------------------------------------------------------------------
1010
1011    /// Insert an item into an LSI table.
1012    #[allow(clippy::too_many_arguments)]
1013    pub fn insert_lsi_item(
1014        &self,
1015        table_name: &str,
1016        index_name: &str,
1017        pk: &str,
1018        sk: &str,
1019        base_pk: &str,
1020        base_sk: &str,
1021        item_json: &str,
1022    ) -> Result<()> {
1023        let sql = sql_builders::lsi_insert_sql(table_name, index_name);
1024        let params = sql_builders::lsi_insert_params(pk, sk, base_pk, base_sk, item_json);
1025        self.conn
1026            .prepare_cached(&sql)?
1027            .execute(rusqlite::params_from_iter(params.iter()))?;
1028        Ok(())
1029    }
1030
1031    /// Delete an item from an LSI table by base table primary key.
1032    pub fn delete_lsi_item(
1033        &self,
1034        table_name: &str,
1035        index_name: &str,
1036        base_pk: &str,
1037        base_sk: &str,
1038    ) -> Result<()> {
1039        let (sql, params) = sql_builders::delete_lsi_item(table_name, index_name, base_pk, base_sk);
1040        self.conn
1041            .prepare_cached(&sql)?
1042            .execute(rusqlite::params_from_iter(params.iter()))?;
1043        Ok(())
1044    }
1045
1046    /// Query items from an LSI table.
1047    pub fn query_lsi_items(
1048        &self,
1049        table_name: &str,
1050        index_name: &str,
1051        pk: &str,
1052        params: &QueryParams,
1053    ) -> Result<Vec<(String, String, String)>> {
1054        let (sql, params_vec) = sql_builders::query_lsi_items(table_name, index_name, pk, params);
1055        let mut stmt = self.conn.prepare(&sql)?;
1056        let rows = stmt
1057            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1058                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1059            })?
1060            .collect::<std::result::Result<Vec<_>, _>>()?;
1061
1062        Ok(rows)
1063    }
1064
1065    /// Scan all items from an LSI table.
1066    pub fn scan_lsi_items(
1067        &self,
1068        table_name: &str,
1069        index_name: &str,
1070        params: &ScanParams,
1071    ) -> Result<Vec<(String, String, String)>> {
1072        let (sql, params_vec) = sql_builders::scan_lsi_items(table_name, index_name, params);
1073        let mut stmt = self.conn.prepare(&sql)?;
1074        let rows = stmt
1075            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1076                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1077            })?
1078            .collect::<std::result::Result<Vec<_>, _>>()?;
1079
1080        Ok(rows)
1081    }
1082
1083    // -----------------------------------------------------------------------
1084    // Transaction support
1085    // -----------------------------------------------------------------------
1086
1087    /// Begin an immediate SQLite transaction.
1088    pub fn begin_transaction(&self) -> Result<()> {
1089        self.conn.execute_batch(sql_builders::BEGIN)?;
1090        Ok(())
1091    }
1092
1093    /// Commit the current transaction.
1094    pub fn commit(&self) -> Result<()> {
1095        self.conn.execute_batch(sql_builders::COMMIT)?;
1096        Ok(())
1097    }
1098
1099    /// Rollback the current transaction.
1100    pub fn rollback(&self) -> Result<()> {
1101        self.conn.execute_batch(sql_builders::ROLLBACK)?;
1102        Ok(())
1103    }
1104
1105    /// Set aggressive PRAGMAs for bulk loading.
1106    ///
1107    /// Disables fsync, increases cache, and enables memory-mapped I/O.
1108    /// Only safe when data loss on crash is acceptable (e.g., fresh import
1109    /// that can be re-run).
1110    pub fn enable_bulk_loading(&self) -> Result<()> {
1111        self.conn.execute_batch(
1112            "PRAGMA synchronous = OFF;
1113             PRAGMA cache_size = -64000;
1114             PRAGMA temp_store = MEMORY;
1115             PRAGMA mmap_size = 268435456;",
1116        )?;
1117        Ok(())
1118    }
1119
1120    /// Restore normal PRAGMAs after bulk loading.
1121    pub fn disable_bulk_loading(&self) -> Result<()> {
1122        self.conn.execute_batch(
1123            "PRAGMA synchronous = NORMAL;
1124             PRAGMA cache_size = -2000;
1125             PRAGMA temp_store = DEFAULT;
1126             PRAGMA mmap_size = 0;",
1127        )?;
1128        Ok(())
1129    }
1130
1131    // -----------------------------------------------------------------------
1132    // Item CRUD
1133    // -----------------------------------------------------------------------
1134
1135    /// Insert or replace an item.
1136    pub fn put_item(
1137        &self,
1138        table_name: &str,
1139        pk: &str,
1140        sk: &str,
1141        item_json: &str,
1142        item_size: usize,
1143    ) -> Result<Option<String>> {
1144        self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1145    }
1146
1147    /// Put an item with an explicit hash prefix for parallel scan ordering.
1148    pub fn put_item_with_hash(
1149        &self,
1150        table_name: &str,
1151        pk: &str,
1152        sk: &str,
1153        item_json: &str,
1154        item_size: usize,
1155        hash_prefix: &str,
1156    ) -> Result<Option<String>> {
1157        // First, try to get the old item for return value
1158        let old_item = self.get_item(table_name, pk, sk)?;
1159
1160        let (sql, params) =
1161            sql_builders::put_item_with_hash(table_name, pk, sk, item_json, item_size, hash_prefix);
1162        self.conn
1163            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1164
1165        Ok(old_item)
1166    }
1167
1168    /// Bulk-insert many base-table rows using a single cached prepared
1169    /// statement (`INSERT OR REPLACE`). Unlike [`Self::put_item_with_hash`],
1170    /// which preserves any existing `cached_at`, this writes `cached_at`
1171    /// verbatim from each row, matching the import flow's semantics.
1172    pub fn put_base_items(
1173        &self,
1174        table_name: &str,
1175        rows: &[crate::storage_backend::BaseItemRow],
1176    ) -> Result<()> {
1177        let escaped = escape_table_name(table_name);
1178        let sql = format!(
1179            "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1180             VALUES (?1, ?2, ?3, ?4, ?5, ?6)"
1181        );
1182        let mut stmt = self.conn.prepare_cached(&sql)?;
1183        for row in rows {
1184            stmt.execute(params![
1185                row.pk,
1186                row.sk,
1187                row.item_json,
1188                row.item_size as i64,
1189                row.cached_at,
1190                row.hash_prefix
1191            ])?;
1192        }
1193        Ok(())
1194    }
1195
1196    /// Get a single item by primary key.
1197    pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1198        let (sql, params) = sql_builders::get_item(table_name, pk, sk);
1199        let result = self
1200            .conn
1201            .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1202                row.get(0)
1203            });
1204
1205        match result {
1206            Ok(json) => Ok(Some(json)),
1207            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1208            Err(e) => Err(DynoxideError::from(e)),
1209        }
1210    }
1211
1212    /// Return the total item_size for all items sharing the given partition key.
1213    pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1214        let (sql, params) = sql_builders::get_partition_size(table_name, pk);
1215        let size: i64 =
1216            self.conn
1217                .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1218                    row.get(0)
1219                })?;
1220        Ok(size)
1221    }
1222
1223    /// Return the total size of LSI items for a given partition key.
1224    /// LSI items are stored as JSON text, so we use length(item_json).
1225    pub fn get_lsi_partition_size(
1226        &self,
1227        table_name: &str,
1228        index_name: &str,
1229        pk: &str,
1230    ) -> Result<i64> {
1231        let (sql, params) = sql_builders::get_lsi_partition_size(table_name, index_name, pk);
1232        let size: i64 =
1233            self.conn
1234                .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1235                    row.get(0)
1236                })?;
1237        Ok(size)
1238    }
1239
1240    /// Delete an item by primary key. Returns the old item_json if it existed.
1241    pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1242        let old_item = self.get_item(table_name, pk, sk)?;
1243
1244        let (sql, params) = sql_builders::delete_item(table_name, pk, sk);
1245        self.conn
1246            .execute(&sql, rusqlite::params_from_iter(params.iter()))?;
1247
1248        Ok(old_item)
1249    }
1250
1251    /// Query items by partition key with optional sort key condition.
1252    ///
1253    /// `sk_condition` is a SQL fragment like `AND sk > ?` with `sk_params` providing values.
1254    /// Returns `(items, has_more)` where items is a vec of `(pk, sk, item_json)`.
1255    pub fn query_items(
1256        &self,
1257        table_name: &str,
1258        pk: &str,
1259        params: &QueryParams,
1260    ) -> Result<Vec<(String, String, String)>> {
1261        let (sql, params_vec) = sql_builders::query_items(table_name, pk, params);
1262        let mut stmt = self.conn.prepare(&sql)?;
1263        let rows = stmt
1264            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1265                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1266            })?
1267            .collect::<std::result::Result<Vec<_>, _>>()?;
1268
1269        Ok(rows)
1270    }
1271
1272    /// Scan items from a table with pagination.
1273    ///
1274    /// Returns `(pk, sk, item_json)` tuples ordered by hash_prefix for
1275    /// dynalite-compatible parallel scan behaviour.
1276    pub fn scan_items(
1277        &self,
1278        table_name: &str,
1279        params: &ScanParams,
1280    ) -> Result<Vec<(String, String, String)>> {
1281        let (sql, params_vec) = sql_builders::scan_items(table_name, params);
1282        let mut stmt = self.conn.prepare(&sql)?;
1283        let rows = stmt
1284            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
1285                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1286            })?
1287            .collect::<std::result::Result<Vec<_>, _>>()?;
1288
1289        Ok(rows)
1290    }
1291
1292    /// Count items in a table.
1293    pub fn count_items(&self, table_name: &str) -> Result<i64> {
1294        let (sql, params) = sql_builders::count_items(table_name);
1295        let count: i64 =
1296            self.conn
1297                .query_row(&sql, rusqlite::params_from_iter(params.iter()), |row| {
1298                    row.get(0)
1299                })?;
1300        Ok(count)
1301    }
1302
1303    // -----------------------------------------------------------------------
1304    // Introspection
1305    // -----------------------------------------------------------------------
1306
1307    /// Get the database file path, or `None` for in-memory databases.
1308    pub fn db_path(&self) -> Option<String> {
1309        self.conn
1310            .path()
1311            .filter(|p| !p.is_empty())
1312            .map(|p| p.to_owned())
1313    }
1314
1315    /// Get the total database size in bytes.
1316    pub fn db_size_bytes(&self) -> Result<u64> {
1317        let size: i64 = self.conn.query_row(
1318            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1319            [],
1320            |row| row.get(0),
1321        )?;
1322        Ok(size as u64)
1323    }
1324
1325    /// Count the number of DynamoDB tables.
1326    pub fn table_count(&self) -> Result<usize> {
1327        let count: i64 = self
1328            .conn
1329            .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1330        Ok(count as usize)
1331    }
1332
1333    /// Get per-table statistics: name, item count, and approximate size in bytes.
1334    ///
1335    /// Uses a single query per table (COUNT + SUM combined) instead of separate queries.
1336    pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1337        let table_names = self.list_table_names()?;
1338        let mut stats = Vec::with_capacity(table_names.len());
1339        for name in table_names {
1340            let sql = format!(
1341                "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1342                escape_table_name(&name)
1343            );
1344            let (item_count, size_bytes): (i64, i64) = self
1345                .conn
1346                .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1347            stats.push(TableStats {
1348                table_name: name,
1349                item_count,
1350                size_bytes: size_bytes as u64,
1351            });
1352        }
1353        Ok(stats)
1354    }
1355
1356    /// Get combined database info in a single call for atomic consistency.
1357    ///
1358    /// Returns all introspection data that `get_database_info` tools need
1359    /// without releasing the lock between queries.
1360    pub fn database_info(&self) -> Result<DatabaseInfo> {
1361        let path = self.db_path();
1362        let size_bytes = self.db_size_bytes()?;
1363        let table_count = self.table_count()?;
1364        let stats = self.table_stats()?;
1365
1366        let mut table_details = Vec::with_capacity(stats.len());
1367        for s in stats {
1368            let metadata = self.get_table_metadata(&s.table_name)?;
1369            table_details.push(TableInfoEntry { stats: s, metadata });
1370        }
1371
1372        Ok(DatabaseInfo {
1373            path,
1374            size_bytes,
1375            table_count,
1376            tables: table_details,
1377        })
1378    }
1379
1380    // -----------------------------------------------------------------------
1381    // Snapshot operations
1382    // -----------------------------------------------------------------------
1383
1384    /// Create a snapshot of the database by copying it to the given path.
1385    /// Uses `VACUUM INTO` which works for both in-memory and file-backed databases.
1386    pub fn vacuum_into(&self, path: &str) -> Result<()> {
1387        if path.contains('\0') {
1388            return Err(DynoxideError::ValidationException(
1389                "path contains null byte".to_string(),
1390            ));
1391        }
1392        self.conn
1393            .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1394        Ok(())
1395    }
1396
1397    /// Run VACUUM to compact the database file in place.
1398    pub fn vacuum(&self) -> Result<()> {
1399        self.conn.execute_batch("VACUUM")?;
1400        Ok(())
1401    }
1402
1403    /// Restore the database from a snapshot file using SQLite's backup API.
1404    /// This replaces the current database contents with the snapshot contents.
1405    /// Works for both in-memory and file-backed databases.
1406    pub fn restore_from(&mut self, path: &str) -> Result<()> {
1407        let source = Connection::open(path)?;
1408        self.restore_from_connection(&source)
1409    }
1410
1411    /// Backup the current database to a new in-memory SQLite connection.
1412    ///
1413    /// Used for in-memory snapshot storage — the returned connection holds
1414    /// a complete copy of the database without touching the filesystem.
1415    pub fn backup_to_memory(&self) -> Result<Connection> {
1416        let mut dest = Connection::open_in_memory()?;
1417        {
1418            let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1419            backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1420        }
1421        Ok(dest)
1422    }
1423
1424    /// Restore the database from another SQLite connection using the backup API.
1425    ///
1426    /// Replaces the current database contents with the source connection's
1427    /// contents. Invalidates the metadata cache.
1428    pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1429        let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1430        backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1431        self.metadata_cache.borrow_mut().clear();
1432        Ok(())
1433    }
1434
1435    /// Get the database size in bytes for an arbitrary connection.
1436    pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1437        let size: i64 = conn.query_row(
1438            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1439            [],
1440            |row| row.get(0),
1441        )?;
1442        Ok(size as u64)
1443    }
1444
1445    // -----------------------------------------------------------------------
1446    // Stream operations
1447    // -----------------------------------------------------------------------
1448
1449    /// Enable streams on a table.
1450    pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1451        self.conn.execute(
1452            "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1453            params![view_type, label, table_name],
1454        )?;
1455        self.metadata_cache.borrow_mut().remove(table_name);
1456        Ok(())
1457    }
1458
1459    /// Disable streams on a table.
1460    pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1461        self.conn.execute(
1462            "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1463            params![table_name],
1464        )?;
1465        self.metadata_cache.borrow_mut().remove(table_name);
1466        Ok(())
1467    }
1468
1469    /// Insert a stream record.
1470    #[allow(clippy::too_many_arguments)]
1471    pub fn insert_stream_record(
1472        &self,
1473        table_name: &str,
1474        event_name: &str,
1475        keys_json: &str,
1476        new_image: Option<&str>,
1477        old_image: Option<&str>,
1478        sequence_number: &str,
1479        shard_id: &str,
1480        created_at: i64,
1481    ) -> Result<()> {
1482        self.insert_stream_record_with_identity(
1483            table_name,
1484            event_name,
1485            keys_json,
1486            new_image,
1487            old_image,
1488            sequence_number,
1489            shard_id,
1490            created_at,
1491            None,
1492        )
1493    }
1494
1495    /// Insert a stream record with optional user identity (for TTL deletions).
1496    #[allow(clippy::too_many_arguments)]
1497    pub fn insert_stream_record_with_identity(
1498        &self,
1499        table_name: &str,
1500        event_name: &str,
1501        keys_json: &str,
1502        new_image: Option<&str>,
1503        old_image: Option<&str>,
1504        sequence_number: &str,
1505        shard_id: &str,
1506        created_at: i64,
1507        user_identity: Option<&str>,
1508    ) -> Result<()> {
1509        self.conn.execute(
1510            "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1511             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1512            params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1513        )?;
1514        Ok(())
1515    }
1516
1517    /// Get the next sequence number for a table's stream.
1518    pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1519        let result: std::result::Result<i64, _> = self.conn.query_row(
1520            "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1521            params![table_name],
1522            |row| row.get(0),
1523        );
1524        match result {
1525            Ok(n) => Ok(n),
1526            Err(_) => Ok(1),
1527        }
1528    }
1529
1530    /// Get stream records for a shard starting after a given sequence number.
1531    pub fn get_stream_records(
1532        &self,
1533        table_name: &str,
1534        shard_id: &str,
1535        after_sequence: i64,
1536        limit: usize,
1537    ) -> Result<Vec<StreamRecord>> {
1538        let mut stmt = self.conn.prepare(
1539            "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1540             FROM _stream_records
1541             WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1542             ORDER BY CAST(sequence_number AS INTEGER) ASC
1543             LIMIT ?4",
1544        )?;
1545        let rows = stmt
1546            .query_map(
1547                params![table_name, shard_id, after_sequence, limit as i64],
1548                |row| {
1549                    Ok(StreamRecord {
1550                        event_name: row.get(0)?,
1551                        keys_json: row.get(1)?,
1552                        new_image: row.get(2)?,
1553                        old_image: row.get(3)?,
1554                        sequence_number: row.get(4)?,
1555                        created_at: row.get(5)?,
1556                        user_identity: row.get(6)?,
1557                    })
1558                },
1559            )?
1560            .collect::<std::result::Result<Vec<_>, _>>()?;
1561        Ok(rows)
1562    }
1563
1564    /// List tables that have streams enabled.
1565    pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1566        let sql = format!(
1567            "SELECT {} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name",
1568            sql_builders::TABLE_METADATA_COLUMNS
1569        );
1570        let mut stmt = self.conn.prepare(&sql)?;
1571        let rows = stmt
1572            .query_map([], row_to_metadata)?
1573            .collect::<std::result::Result<Vec<_>, _>>()?;
1574        Ok(rows)
1575    }
1576
1577    // -----------------------------------------------------------------------
1578    // TTL operations
1579    // -----------------------------------------------------------------------
1580
1581    /// Update TTL configuration for a table.
1582    pub fn update_ttl_config(
1583        &self,
1584        table_name: &str,
1585        attribute_name: Option<&str>,
1586        enabled: bool,
1587    ) -> Result<()> {
1588        self.conn.execute(
1589            "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1590            params![attribute_name, enabled as i32, table_name],
1591        )?;
1592        self.metadata_cache.borrow_mut().remove(table_name);
1593        Ok(())
1594    }
1595
1596    /// List tables that have TTL enabled.
1597    pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1598        let sql = format!(
1599            "SELECT {} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name",
1600            sql_builders::TABLE_METADATA_COLUMNS
1601        );
1602        let mut stmt = self.conn.prepare(&sql)?;
1603        let rows = stmt
1604            .query_map([], row_to_metadata)?
1605            .collect::<std::result::Result<Vec<_>, _>>()?;
1606        Ok(rows)
1607    }
1608
1609    /// Get the min and max sequence numbers for a shard.
1610    pub fn get_shard_sequence_range(
1611        &self,
1612        table_name: &str,
1613        shard_id: &str,
1614    ) -> Result<(Option<String>, Option<String>)> {
1615        let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1616            "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1617            params![table_name, shard_id],
1618            |row| Ok((row.get(0)?, row.get(1)?)),
1619        );
1620        match result {
1621            Ok(range) => Ok(range),
1622            Err(_) => Ok((None, None)),
1623        }
1624    }
1625
1626    // -----------------------------------------------------------------------
1627    // Cache tracking (cached_at)
1628    // -----------------------------------------------------------------------
1629
1630    /// Update the `cached_at` timestamp for a single item.
1631    pub fn touch_cached_at(
1632        &self,
1633        table_name: &str,
1634        pk: &str,
1635        sk: &str,
1636        timestamp: f64,
1637    ) -> Result<()> {
1638        let sql = format!(
1639            "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1640            escape_table_name(table_name)
1641        );
1642        self.conn.execute(&sql, params![timestamp, pk, sk])?;
1643        Ok(())
1644    }
1645
1646    /// Get items ordered by `cached_at` (oldest first) for LRU eviction.
1647    ///
1648    /// Returns `(pk, sk, item_size)` tuples. Items with NULL `cached_at`
1649    /// are excluded (they were never cached from a remote source).
1650    pub fn get_lru_items(
1651        &self,
1652        table_name: &str,
1653        limit: usize,
1654    ) -> Result<Vec<(String, String, i64)>> {
1655        let sql = format!(
1656            "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1657            escape_table_name(table_name)
1658        );
1659        let mut stmt = self.conn.prepare(&sql)?;
1660        let rows = stmt
1661            .query_map(params![limit as i64], |row| {
1662                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1663            })?
1664            .collect::<std::result::Result<Vec<_>, _>>()?;
1665        Ok(rows)
1666    }
1667}
1668
1669/// A stream record from the `_stream_records` table.
1670#[derive(Debug, Clone)]
1671pub struct StreamRecord {
1672    pub event_name: String,
1673    pub keys_json: String,
1674    pub new_image: Option<String>,
1675    pub old_image: Option<String>,
1676    pub sequence_number: String,
1677    pub created_at: i64,
1678    pub user_identity: Option<String>,
1679}
1680
1681/// Per-table statistics returned by `Storage::table_stats()`.
1682#[derive(Debug, Clone)]
1683pub struct TableStats {
1684    pub table_name: String,
1685    pub item_count: i64,
1686    pub size_bytes: u64,
1687}
1688
1689/// Combined database introspection info returned by `Storage::database_info()`.
1690#[derive(Debug, Clone)]
1691pub struct DatabaseInfo {
1692    pub path: Option<String>,
1693    pub size_bytes: u64,
1694    pub table_count: usize,
1695    pub tables: Vec<TableInfoEntry>,
1696}
1697
1698/// Per-table stats + metadata for `DatabaseInfo`.
1699#[derive(Debug, Clone)]
1700pub struct TableInfoEntry {
1701    pub stats: TableStats,
1702    pub metadata: Option<TableMetadata>,
1703}
1704
1705/// Metadata row from the `_tables` table.
1706///
1707/// Note: The `tags` column is intentionally excluded. Tags are not on the hot
1708/// path for item operations and are accessed via separate `get_tags`/`set_tags`
1709/// methods to keep the metadata cache lean.
1710#[derive(Debug, Clone)]
1711pub struct TableMetadata {
1712    pub table_name: String,
1713    pub key_schema: String,
1714    pub attribute_definitions: String,
1715    pub gsi_definitions: Option<String>,
1716    pub lsi_definitions: Option<String>,
1717    pub stream_enabled: bool,
1718    pub stream_view_type: Option<String>,
1719    pub stream_label: Option<String>,
1720    pub ttl_attribute: Option<String>,
1721    pub ttl_enabled: bool,
1722    pub created_at: i64,
1723    pub table_status: String,
1724    pub billing_mode: Option<String>,
1725    pub provisioned_throughput: Option<String>,
1726    pub sse_specification: Option<String>,
1727    pub table_class: Option<String>,
1728    pub deletion_protection_enabled: bool,
1729    pub on_demand_throughput: Option<String>,
1730    /// Stable per-table identifier assigned at create time (#55). `None` only
1731    /// for rows that predate the v8 migration's backfill.
1732    pub table_id: Option<String>,
1733}
1734
1735/// Map a row from the _tables SELECT to a TableMetadata struct.
1736#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
1737fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
1738    Ok(TableMetadata {
1739        table_name: row.get(0)?,
1740        key_schema: row.get(1)?,
1741        attribute_definitions: row.get(2)?,
1742        gsi_definitions: row.get(3)?,
1743        lsi_definitions: row.get(4)?,
1744        stream_enabled: row.get::<_, i32>(5)? != 0,
1745        stream_view_type: row.get(6)?,
1746        stream_label: row.get(7)?,
1747        ttl_attribute: row.get(8)?,
1748        ttl_enabled: row.get::<_, i32>(9)? != 0,
1749        created_at: row.get(10)?,
1750        table_status: row.get(11)?,
1751        billing_mode: row.get(12)?,
1752        provisioned_throughput: row.get(13)?,
1753        sse_specification: row.get(14)?,
1754        table_class: row.get(15)?,
1755        deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
1756        on_demand_throughput: row.get(17)?,
1757        table_id: row.get(18)?,
1758    })
1759}
1760
1761#[cfg(all(test, any(feature = "native-sqlite", feature = "_has-encryption")))]
1762mod tests {
1763    use super::*;
1764
1765    fn test_storage() -> Storage {
1766        Storage::memory().expect("Failed to create in-memory storage")
1767    }
1768
1769    #[test]
1770    fn test_initialize_creates_metadata_tables() {
1771        let storage = test_storage();
1772        // _config and _tables should exist
1773        let version: String = storage
1774            .conn()
1775            .query_row(
1776                "SELECT value FROM _config WHERE key = 'schema_version'",
1777                [],
1778                |row| row.get(0),
1779            )
1780            .unwrap();
1781        assert_eq!(version, SCHEMA_VERSION);
1782    }
1783
1784    #[test]
1785    fn test_migrate_v6_to_v7_adds_on_demand_throughput_column() {
1786        // Issue #44: the on_demand_throughput column must be added to existing
1787        // on-disk databases through the versioned migration chain, not just the
1788        // fresh CREATE — otherwise DescribeTable on a pre-existing table errors.
1789        let tmp = tempfile::NamedTempFile::new().unwrap();
1790        let path = tmp.path().to_str().unwrap().to_string();
1791
1792        // Build a v6-shape database by hand: _tables without on_demand_throughput,
1793        // schema_version pinned at 6, and one pre-existing table row.
1794        {
1795            let conn = Connection::open(&path).unwrap();
1796            conn.execute_batch(
1797                "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1798                 CREATE TABLE _tables (
1799                    table_name TEXT PRIMARY KEY,
1800                    key_schema TEXT NOT NULL,
1801                    attribute_definitions TEXT NOT NULL,
1802                    gsi_definitions TEXT,
1803                    lsi_definitions TEXT,
1804                    stream_enabled INTEGER DEFAULT 0,
1805                    stream_view_type TEXT,
1806                    stream_label TEXT,
1807                    ttl_attribute TEXT,
1808                    ttl_enabled INTEGER DEFAULT 0,
1809                    created_at INTEGER NOT NULL,
1810                    table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1811                    billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1812                    provisioned_throughput TEXT,
1813                    tags TEXT,
1814                    sse_specification TEXT,
1815                    table_class TEXT,
1816                    deletion_protection_enabled INTEGER DEFAULT 0
1817                 );",
1818            )
1819            .unwrap();
1820            conn.execute(
1821                "INSERT INTO _config (key, value) VALUES ('schema_version', '6')",
1822                [],
1823            )
1824            .unwrap();
1825            conn.execute(
1826                "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1827                 VALUES ('LegacyTable', ?1, ?2, 0)",
1828                params![
1829                    r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1830                    r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1831                ],
1832            )
1833            .unwrap();
1834        }
1835
1836        // Reopening runs the migration chain; v6 -> v7 adds the column, and the
1837        // chain continues to the current SCHEMA_VERSION.
1838        let storage = Storage::new(&path).unwrap();
1839        let version: String = storage
1840            .conn()
1841            .query_row(
1842                "SELECT value FROM _config WHERE key = 'schema_version'",
1843                [],
1844                |r| r.get(0),
1845            )
1846            .unwrap();
1847        assert_eq!(version, SCHEMA_VERSION);
1848
1849        // The pre-existing row survives and reads back through row_to_metadata,
1850        // with the new column present and NULL.
1851        let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1852        assert_eq!(meta.table_name, "LegacyTable");
1853        assert!(meta.on_demand_throughput.is_none());
1854
1855        // A bare SELECT of the new column would error if the ALTER had not run.
1856        let col: Option<String> = storage
1857            .conn()
1858            .query_row(
1859                "SELECT on_demand_throughput FROM _tables WHERE table_name = 'LegacyTable'",
1860                [],
1861                |r| r.get(0),
1862            )
1863            .unwrap();
1864        assert!(col.is_none());
1865    }
1866
1867    /// #55: the v7 -> v8 migration adds the `table_id` column and backfills any
1868    /// pre-existing table with a one-time random id, so a legacy table reports a
1869    /// stable TableId from then on.
1870    #[test]
1871    fn test_migrate_v7_to_v8_backfills_table_id() {
1872        let tmp = tempfile::NamedTempFile::new().unwrap();
1873        let path = tmp.path().to_str().unwrap().to_string();
1874
1875        // Build a v7-shape database by hand: _tables with on_demand_throughput
1876        // but no table_id, schema_version pinned at 7, one pre-existing row.
1877        {
1878            let conn = Connection::open(&path).unwrap();
1879            conn.execute_batch(
1880                "CREATE TABLE _config (key TEXT PRIMARY KEY, value TEXT NOT NULL);
1881                 CREATE TABLE _tables (
1882                    table_name TEXT PRIMARY KEY,
1883                    key_schema TEXT NOT NULL,
1884                    attribute_definitions TEXT NOT NULL,
1885                    gsi_definitions TEXT,
1886                    lsi_definitions TEXT,
1887                    stream_enabled INTEGER DEFAULT 0,
1888                    stream_view_type TEXT,
1889                    stream_label TEXT,
1890                    ttl_attribute TEXT,
1891                    ttl_enabled INTEGER DEFAULT 0,
1892                    created_at INTEGER NOT NULL,
1893                    table_status TEXT NOT NULL DEFAULT 'ACTIVE',
1894                    billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
1895                    provisioned_throughput TEXT,
1896                    tags TEXT,
1897                    sse_specification TEXT,
1898                    table_class TEXT,
1899                    deletion_protection_enabled INTEGER DEFAULT 0,
1900                    on_demand_throughput TEXT
1901                 );",
1902            )
1903            .unwrap();
1904            conn.execute(
1905                "INSERT INTO _config (key, value) VALUES ('schema_version', '7')",
1906                [],
1907            )
1908            .unwrap();
1909            conn.execute(
1910                "INSERT INTO _tables (table_name, key_schema, attribute_definitions, created_at) \
1911                 VALUES ('LegacyTable', ?1, ?2, 0)",
1912                params![
1913                    r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1914                    r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1915                ],
1916            )
1917            .unwrap();
1918        }
1919
1920        // Reopening runs the migration chain through v8.
1921        let storage = Storage::new(&path).unwrap();
1922        let version: String = storage
1923            .conn()
1924            .query_row(
1925                "SELECT value FROM _config WHERE key = 'schema_version'",
1926                [],
1927                |r| r.get(0),
1928            )
1929            .unwrap();
1930        assert_eq!(version, SCHEMA_VERSION);
1931
1932        // The legacy row was backfilled with a non-empty table_id, and it reads
1933        // back through row_to_metadata.
1934        let meta = storage.get_table_metadata("LegacyTable").unwrap().unwrap();
1935        let id = meta.table_id.expect("legacy table should be backfilled");
1936        assert!(!id.is_empty());
1937
1938        // Reopening again keeps the same id (stable, not regenerated).
1939        drop(storage);
1940        let storage2 = Storage::new(&path).unwrap();
1941        let meta2 = storage2.get_table_metadata("LegacyTable").unwrap().unwrap();
1942        assert_eq!(meta2.table_id.as_deref(), Some(id.as_str()));
1943    }
1944
1945    #[test]
1946    fn test_wal_mode_enabled() {
1947        let storage = test_storage();
1948        let mode: String = storage
1949            .conn()
1950            .query_row("PRAGMA journal_mode", [], |row| row.get(0))
1951            .unwrap();
1952        // In-memory databases may report "memory" instead of "wal"
1953        assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
1954    }
1955
1956    // FNV-1a parity contract. The wasm bridge hashes through js/fnv1a.js, which
1957    // parallel-scan segment assignment relies on agreeing with this native
1958    // scalar byte-for-byte. These vectors are the shared truth: the JS unit test
1959    // js/fnv1a.test.js asserts the same inputs hash to the same values, pinning
1960    // the two implementations together.
1961    #[test]
1962    fn fnv1a_hash_matches_known_vectors() {
1963        let storage = test_storage();
1964        let cases: [(&str, i64); 6] = [
1965            ("", 2166136261),
1966            ("a", 3826002220),
1967            ("u#1", 2199603432),
1968            ("artist#42", 2385694177),
1969            ("café", 2821410889),
1970            ("tenant#9007199254740993", 2022216178),
1971        ];
1972        for (input, expected) in cases {
1973            let got: i64 = storage
1974                .conn()
1975                .query_row("SELECT fnv1a_hash(?1)", [input], |row| row.get(0))
1976                .unwrap();
1977            assert_eq!(got, expected, "fnv1a_hash({input:?})");
1978        }
1979    }
1980
1981    #[test]
1982    fn test_table_metadata_crud() {
1983        let storage = test_storage();
1984
1985        // Initially no tables
1986        assert!(!storage.table_exists("TestTable").unwrap());
1987        assert!(storage.list_table_names().unwrap().is_empty());
1988
1989        // Insert metadata
1990        storage
1991            .insert_table_metadata(&CreateTableMetadata {
1992                table_name: "TestTable",
1993                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
1994                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
1995                created_at: 1000000,
1996                ..Default::default()
1997            })
1998            .unwrap();
1999
2000        assert!(storage.table_exists("TestTable").unwrap());
2001        assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2002
2003        // Get metadata
2004        let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2005        assert_eq!(meta.table_name, "TestTable");
2006        assert_eq!(meta.table_status, "ACTIVE");
2007        assert_eq!(meta.created_at, 1000000);
2008
2009        // Delete metadata
2010        assert!(storage.delete_table_metadata("TestTable").unwrap());
2011        assert!(!storage.table_exists("TestTable").unwrap());
2012    }
2013
2014    #[test]
2015    fn test_create_and_drop_data_table() {
2016        let storage = test_storage();
2017        storage.create_data_table("MyTable").unwrap();
2018
2019        // Should be able to insert into it
2020        storage
2021            .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2022            .unwrap();
2023
2024        let item = storage.get_item("MyTable", "pk1", "").unwrap();
2025        assert!(item.is_some());
2026
2027        storage.drop_data_table("MyTable").unwrap();
2028    }
2029
2030    #[test]
2031    fn test_item_crud() {
2032        let storage = test_storage();
2033        storage.create_data_table("Items").unwrap();
2034
2035        // Put item
2036        let old = storage
2037            .put_item(
2038                "Items",
2039                "user#1",
2040                "profile",
2041                r#"{"name":{"S":"Alice"}}"#,
2042                20,
2043            )
2044            .unwrap();
2045        assert!(old.is_none()); // No previous item
2046
2047        // Get item
2048        let item = storage.get_item("Items", "user#1", "profile").unwrap();
2049        assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2050
2051        // Replace item (returns old)
2052        let old = storage
2053            .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2054            .unwrap();
2055        assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2056
2057        // Delete item
2058        let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2059        assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2060
2061        // Item should be gone
2062        assert!(
2063            storage
2064                .get_item("Items", "user#1", "profile")
2065                .unwrap()
2066                .is_none()
2067        );
2068    }
2069
2070    #[test]
2071    fn test_query_items() {
2072        let storage = test_storage();
2073        storage.create_data_table("Orders").unwrap();
2074
2075        // Insert several items for the same partition key
2076        for i in 1..=5 {
2077            let sk = format!("order#{i:03}");
2078            let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2079            storage
2080                .put_item("Orders", "user#1", &sk, &json, 10)
2081                .unwrap();
2082        }
2083
2084        // Query all for partition key
2085        let results = storage
2086            .query_items(
2087                "Orders",
2088                "user#1",
2089                &QueryParams {
2090                    forward: true,
2091                    ..Default::default()
2092                },
2093            )
2094            .unwrap();
2095        assert_eq!(results.len(), 5);
2096        assert_eq!(results[0].1, "order#001"); // Sorted ascending
2097
2098        // Query with limit
2099        let results = storage
2100            .query_items(
2101                "Orders",
2102                "user#1",
2103                &QueryParams {
2104                    forward: true,
2105                    limit: Some(2),
2106                    ..Default::default()
2107                },
2108            )
2109            .unwrap();
2110        assert_eq!(results.len(), 2);
2111
2112        // Query reverse
2113        let results = storage
2114            .query_items(
2115                "Orders",
2116                "user#1",
2117                &QueryParams {
2118                    forward: false,
2119                    limit: Some(2),
2120                    ..Default::default()
2121                },
2122            )
2123            .unwrap();
2124        assert_eq!(results.len(), 2);
2125        assert_eq!(results[0].1, "order#005"); // Sorted descending
2126    }
2127
2128    #[test]
2129    fn test_scan_items() {
2130        let storage = test_storage();
2131        storage.create_data_table("ScanTest").unwrap();
2132
2133        storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2134        storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2135        storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2136
2137        let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2138        assert_eq!(results.len(), 3);
2139
2140        // Scan with limit
2141        let results = storage
2142            .scan_items(
2143                "ScanTest",
2144                &ScanParams {
2145                    limit: Some(2),
2146                    ..Default::default()
2147                },
2148            )
2149            .unwrap();
2150        assert_eq!(results.len(), 2);
2151
2152        // Scan with pagination
2153        let results = storage
2154            .scan_items(
2155                "ScanTest",
2156                &ScanParams {
2157                    limit: Some(2),
2158                    exclusive_start_pk: Some("a"),
2159                    exclusive_start_sk: Some("1"),
2160                    ..Default::default()
2161                },
2162            )
2163            .unwrap();
2164        assert_eq!(results.len(), 2);
2165        assert_eq!(results[0].0, "b"); // Skipped "a"
2166    }
2167
2168    #[test]
2169    fn test_count_items() {
2170        let storage = test_storage();
2171        storage.create_data_table("CountTest").unwrap();
2172
2173        assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2174
2175        storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2176        storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2177
2178        assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2179    }
2180
2181    #[test]
2182    fn test_gsi_table_lifecycle() {
2183        let storage = test_storage();
2184        storage.create_gsi_table("Orders", "ByDate").unwrap();
2185
2186        // Should be able to write to the GSI table via raw SQL
2187        let gsi_name = "Orders::gsi::ByDate";
2188        let sql = format!(
2189            "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2190            gsi_name.replace('"', "\"\"")
2191        );
2192        storage
2193            .conn()
2194            .execute(
2195                &sql,
2196                params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2197            )
2198            .unwrap();
2199
2200        storage.drop_gsi_table("Orders", "ByDate").unwrap();
2201    }
2202
2203    #[test]
2204    fn test_nonexistent_table_metadata() {
2205        let storage = test_storage();
2206        assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2207        assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2208    }
2209
2210    #[test]
2211    fn test_metadata_cache_hit() {
2212        let storage = test_storage();
2213        storage
2214            .insert_table_metadata(&CreateTableMetadata {
2215                table_name: "CacheTest",
2216                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2217                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2218                created_at: 1000000,
2219                ..Default::default()
2220            })
2221            .unwrap();
2222
2223        // First call populates cache
2224        let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2225        assert_eq!(meta1.table_name, "CacheTest");
2226
2227        // Second call should hit cache (same result)
2228        let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2229        assert_eq!(meta2.table_name, "CacheTest");
2230        assert_eq!(meta1.created_at, meta2.created_at);
2231
2232        // Cache should have the entry
2233        assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2234    }
2235
2236    #[test]
2237    fn test_metadata_cache_invalidated_on_delete() {
2238        let storage = test_storage();
2239        storage
2240            .insert_table_metadata(&CreateTableMetadata {
2241                table_name: "DelCache",
2242                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2243                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2244                created_at: 1000000,
2245                ..Default::default()
2246            })
2247            .unwrap();
2248
2249        // Populate cache
2250        storage.get_table_metadata("DelCache").unwrap();
2251        assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2252
2253        // Delete should invalidate cache
2254        storage.delete_table_metadata("DelCache").unwrap();
2255        assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2256    }
2257
2258    #[test]
2259    fn test_metadata_cache_invalidated_on_stream_enable() {
2260        let storage = test_storage();
2261        storage
2262            .insert_table_metadata(&CreateTableMetadata {
2263                table_name: "StreamCache",
2264                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2265                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2266                created_at: 1000000,
2267                ..Default::default()
2268            })
2269            .unwrap();
2270
2271        // Populate cache
2272        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2273        assert!(!meta.stream_enabled);
2274
2275        // Enable stream should invalidate cache
2276        storage
2277            .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2278            .unwrap();
2279        assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2280
2281        // Next get should reflect the change
2282        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2283        assert!(meta.stream_enabled);
2284    }
2285
2286    #[test]
2287    fn test_metadata_cache_invalidated_on_ttl_update() {
2288        let storage = test_storage();
2289        storage
2290            .insert_table_metadata(&CreateTableMetadata {
2291                table_name: "TtlCache",
2292                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2293                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2294                created_at: 1000000,
2295                ..Default::default()
2296            })
2297            .unwrap();
2298
2299        // Populate cache
2300        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2301        assert!(!meta.ttl_enabled);
2302
2303        // Update TTL should invalidate cache
2304        storage
2305            .update_ttl_config("TtlCache", Some("expires_at"), true)
2306            .unwrap();
2307        assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2308
2309        // Next get should reflect the change
2310        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2311        assert!(meta.ttl_enabled);
2312        assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2313    }
2314
2315    #[test]
2316    fn test_num_to_buffer_zero() {
2317        // numToBuffer("0") → [0x80]
2318        assert_eq!(num_to_buffer("0"), vec![0x80]);
2319        assert_eq!(num_to_buffer("-0"), vec![0x80]);
2320    }
2321
2322    #[test]
2323    fn test_hash_prefix_string_keys() {
2324        // Verify known hash prefixes for string keys used in scan tests.
2325        // These specific values determine which segment items land in.
2326        let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2327        let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2328        let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2329        let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2330
2331        // With TotalSegments=4096, segment 0 owns bucket 0 only.
2332        // Items "3635" and "228" must be in segment 0 (bucket 0).
2333        assert_eq!(
2334            hash_bucket(&h1),
2335            0,
2336            "3635 should be bucket 0, got hash {h1}"
2337        );
2338        assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2339
2340        // "1668" must be in segment 1 (bucket 1)
2341        assert_eq!(
2342            hash_bucket(&h3),
2343            1,
2344            "1668 should be bucket 1, got hash {h3}"
2345        );
2346
2347        // "3435" must be in segment 4 (bucket 4)
2348        assert_eq!(
2349            hash_bucket(&h4),
2350            4,
2351            "3435 should be bucket 4, got hash {h4}"
2352        );
2353    }
2354
2355    #[test]
2356    fn test_hash_prefix_number_keys() {
2357        // Verify number key hash prefixes from scan tests.
2358        // "251" must be in segment 1 (bucket 1) with TotalSegments=4096
2359        let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2360        assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2361
2362        // "2388" must be in segment 4095 (bucket 4095)
2363        let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2364        assert_eq!(
2365            hash_bucket(&h2),
2366            4095,
2367            "2388 should be bucket 4095, got hash {h2}"
2368        );
2369    }
2370
2371    #[test]
2372    fn test_hash_in_segment() {
2373        // bucket 0 should be in segment 0 of 4096
2374        assert!(hash_in_segment("000000", 0, 4096));
2375        assert!(!hash_in_segment("000000", 1, 4096));
2376
2377        // bucket 1 should be in segment 1 of 4096
2378        assert!(hash_in_segment("001000", 1, 4096));
2379        assert!(!hash_in_segment("001000", 0, 4096));
2380
2381        // bucket 4095 should be in segment 4095 of 4096
2382        assert!(hash_in_segment("fff000", 4095, 4096));
2383        assert!(!hash_in_segment("fff000", 0, 4096));
2384
2385        // With 2 segments: buckets 0-2047 in segment 0, 2048-4095 in segment 1
2386        assert!(hash_in_segment("000000", 0, 2));
2387        assert!(hash_in_segment("7ff000", 0, 2));
2388        assert!(hash_in_segment("800000", 1, 2));
2389        assert!(hash_in_segment("fff000", 1, 2));
2390    }
2391}