Skip to main content

dynoxide/
storage.rs

1use crate::errors::{DynoxideError, Result};
2use crate::types::AttributeValue;
3use rusqlite::{Connection, params};
4use std::cell::RefCell;
5use std::collections::HashMap;
6
7/// Current schema version. Stored in the `_config` table for future migrations.
8const SCHEMA_VERSION: &str = "6";
9
10/// Number of hash buckets used for parallel scan segment assignment.
11/// Matches dynalite's implementation.
12const HASH_BUCKETS: u32 = 4096;
13
14// ---------------------------------------------------------------------------
15// MD5-based hash prefix for parallel scan (dynalite-compatible)
16// ---------------------------------------------------------------------------
17
18/// Compute the 6-character hex hash prefix for a partition key value.
19///
20/// Uses `MD5("Outliers" + key_bytes)` where `key_bytes` depends on the
21/// attribute type:
22/// - `S`: UTF-8 bytes of the string
23/// - `N`: Oracle packed BCD encoding via [`num_to_buffer`]
24/// - `B`: raw binary bytes
25///
26/// This matches dynalite's `hashPrefix` function.
27pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
28    let key_bytes = match pk_value {
29        AttributeValue::S(s) => s.as_bytes().to_vec(),
30        AttributeValue::N(n) => num_to_buffer(n),
31        AttributeValue::B(b) => b.clone(),
32        _ => vec![], // Should not happen for valid keys
33    };
34
35    let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
36    format!("{:032x}", digest)[..6].to_string()
37}
38
39/// Compute the hash bucket (0..4095) from a 6-character hex hash prefix.
40pub fn hash_bucket(hash_prefix: &str) -> u32 {
41    let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
42    u32::from_str_radix(prefix_3, 16).unwrap_or(0)
43}
44
45/// Convert a DynamoDB number string to Oracle-style packed BCD bytes.
46///
47/// Faithfully ports dynalite's `numToBuffer` function from `db/index.js`.
48/// Uses Big.js semantics: `num.s` (sign), `num.c` (coefficient digits),
49/// `num.e` (exponent = position of first digit relative to decimal point - 1).
50fn num_to_buffer(num_str: &str) -> Vec<u8> {
51    let trimmed = num_str.trim();
52    if trimmed.is_empty() {
53        return vec![0x80];
54    }
55
56    use bigdecimal::BigDecimal;
57    use std::str::FromStr;
58
59    let bd = match BigDecimal::from_str(trimmed) {
60        Ok(v) => v,
61        Err(_) => return vec![0x80],
62    };
63
64    if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
65        return vec![0x80];
66    }
67
68    let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
69    let bd_abs = if is_negative { -&bd } else { bd.clone() };
70
71    let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
72    if mantissa.is_empty() {
73        return vec![0x80];
74    }
75
76    // JS: scale = num.s (-1 for negative, 1 for positive)
77    // JS: appendZero = exponent % 2 ? 1 : 0
78    let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
79    let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
80
81    let mut byte_array: Vec<u8>;
82    if byte_len_no_exp < 20 && is_negative {
83        byte_array = vec![0u8; byte_len_no_exp + 2];
84        byte_array[byte_len_no_exp + 1] = 102;
85    } else {
86        byte_array = vec![0u8; byte_len_no_exp + 1];
87    }
88
89    // byteArray[0] = Math.floor((exponent + appendZero) / 2) - 64
90    // For negative exponents, JS Math.floor(-3/2) = -2, matching Rust integer division
91    // for negative values we need floor division, not truncation.
92    let exp_sum = exponent + append_zero;
93    let exp_byte_val = floor_div(exp_sum, 2) - 64;
94    if is_negative {
95        // byteArray[0] ^= 0xffffffff — JS bitwise XOR on a number
96        // This effectively does a bitwise NOT on the low byte
97        byte_array[0] = (exp_byte_val ^ !0i64) as u8;
98    } else {
99        byte_array[0] = exp_byte_val as u8;
100    }
101
102    // The main loop faithfully mirrors the JS for loop.
103    // JS uses mantissaIndex as a signed int that can be decremented to -1.
104    let mut mi: i64 = 0; // mantissaIndex (signed to allow -1)
105    let mlen = mantissa.len() as i64;
106    let mut appended_zero = false;
107
108    while mi < mlen {
109        let bai = ((mi + append_zero) / 2 + 1) as usize; // byteArrayIndex
110        if append_zero != 0 && mi == 0 && !appended_zero {
111            byte_array[bai] = 0;
112            appended_zero = true;
113            mi -= 1; // JS: mantissaIndex--
114        } else if (mi + append_zero) % 2 == 0 {
115            byte_array[bai] = mantissa[mi as usize] * 10;
116        } else {
117            byte_array[bai] += mantissa[mi as usize];
118        }
119
120        // Finalise byte: if odd position or last mantissa digit
121        if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
122            if is_negative {
123                byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
124            } else {
125                byte_array[bai] = byte_array[bai].wrapping_add(1);
126            }
127        }
128
129        mi += 1; // JS: for loop increment
130    }
131
132    byte_array
133}
134
135/// Floor division for signed integers (matching JS Math.floor for division).
136fn floor_div(a: i64, b: i64) -> i64 {
137    let d = a / b;
138    let r = a % b;
139    if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
140}
141
142/// Extract mantissa digits and exponent from a BigDecimal.
143///
144/// Returns (digits, exponent) matching Big.js semantics where:
145/// - digits: array of individual digits [2, 5, 1] for "251"
146/// - exponent: number of digits before the decimal point
147///   e.g., "251" → exponent=3, "0.012345" → exponent=-1
148fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
149    // Normalize to remove trailing zeros
150    let normalized = bd.normalized();
151
152    // Get the string representation without scientific notation
153    // We need the digits and the position of the decimal point
154    let (bigint, scale) = normalized.as_bigint_and_exponent();
155    let digits_str = bigint.to_string();
156    let digits_str = digits_str.trim_start_matches('-');
157
158    let digits: Vec<u8> = digits_str
159        .chars()
160        .map(|c| c.to_digit(10).unwrap() as u8)
161        .collect();
162
163    // scale = number of digits after decimal point in the representation
164    // exponent (Big.js style) = number_of_digits - scale = digits.len() as i64 - scale
165    let exponent = digits.len() as i64 - scale;
166
167    (digits, exponent)
168}
169
170/// Check whether a hash_prefix falls within the range for a given segment.
171///
172/// Uses dynalite's 4096-bucket scheme:
173/// - bucket = parseInt(hash_prefix[0..3], 16)
174/// - segment owns buckets from ceil(4096 * segment / total) to
175///   ceil(4096 * (segment+1) / total) - 1
176pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
177    let bucket = hash_bucket(hash_prefix);
178    let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
179    let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
180    bucket >= start && bucket <= end
181}
182
183fn ceiling_div(a: u32, b: u32) -> u32 {
184    a.div_ceil(b)
185}
186
187/// Parameters for scan operations (base table or GSI).
188#[derive(Debug, Default)]
189pub struct ScanParams<'a> {
190    pub limit: Option<usize>,
191    pub exclusive_start_pk: Option<&'a str>,
192    pub exclusive_start_sk: Option<&'a str>,
193    pub segment: Option<u32>,
194    pub total_segments: Option<u32>,
195    /// For LSI pagination: base table PK for composite cursor.
196    pub exclusive_start_base_pk: Option<&'a str>,
197    /// For LSI pagination: base table SK for composite cursor.
198    pub exclusive_start_base_sk: Option<&'a str>,
199}
200
201/// Parameters for inserting table metadata.
202#[derive(Debug, Default)]
203pub struct CreateTableMetadata<'a> {
204    pub table_name: &'a str,
205    pub key_schema: &'a str,
206    pub attribute_definitions: &'a str,
207    pub gsi_definitions: Option<&'a str>,
208    pub lsi_definitions: Option<&'a str>,
209    pub provisioned_throughput: Option<&'a str>,
210    pub created_at: i64,
211    pub sse_specification: Option<&'a str>,
212    pub table_class: Option<&'a str>,
213    pub deletion_protection_enabled: bool,
214    pub billing_mode: Option<&'a str>,
215}
216
217/// Parameters for query operations (base table or GSI).
218#[derive(Debug, Default)]
219pub struct QueryParams<'a> {
220    pub sk_condition: Option<&'a str>,
221    pub sk_params: &'a [&'a str],
222    pub forward: bool,
223    pub limit: Option<usize>,
224    pub exclusive_start_sk: Option<&'a str>,
225    /// For LSI pagination: base table PK for composite cursor.
226    pub exclusive_start_base_pk: Option<&'a str>,
227    /// For LSI pagination: base table SK for composite cursor.
228    pub exclusive_start_base_sk: Option<&'a str>,
229}
230
231/// Low-level SQLite storage layer.
232///
233/// Manages the SQLite connection, metadata tables, and per-DynamoDB-table
234/// data tables. All SQL lives here — higher layers work with Rust types.
235pub struct Storage {
236    conn: Connection,
237    /// In-memory cache of table metadata to avoid repeated SQLite reads.
238    /// Safe to use `RefCell` because `Storage` is always behind `Arc<Mutex<>>`.
239    metadata_cache: RefCell<HashMap<String, TableMetadata>>,
240}
241
242impl Storage {
243    /// Open a persistent database at the given path.
244    pub fn new(path: &str) -> Result<Self> {
245        let conn = Connection::open(path)?;
246        let mut storage = Self {
247            conn,
248            metadata_cache: RefCell::new(HashMap::new()),
249        };
250        storage.initialize().map_err(Self::maybe_encrypted_error)?;
251        Ok(storage)
252    }
253
254    /// If a SQLite error is SQLITE_NOTADB, return a clearer error message
255    /// suggesting the database may be encrypted.
256    fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
257        if let DynoxideError::SqliteError(ref sqlite_err) = err {
258            if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
259                return DynoxideError::InternalServerError(
260                    "Database file is encrypted or not a valid SQLite database. \
261                     If encrypted, enable the `encryption` or `encryption-cc` feature \
262                     and use Database::new_encrypted() with the correct key."
263                        .to_string(),
264                );
265            }
266        }
267        err
268    }
269
270    /// Open or create an encrypted persistent database at the given path.
271    ///
272    /// The key is passed to SQLCipher via `PRAGMA key`. The database file is
273    /// encrypted at rest using AES-256-CBC. A database opened without calling
274    /// `PRAGMA key` is treated as a normal unencrypted database by SQLCipher.
275    #[cfg(feature = "_has-encryption")]
276    pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
277        use zeroize::Zeroize;
278
279        let conn = Connection::open(path)?;
280        // Safety: key is validated to be exactly 64 hex characters [0-9a-fA-F]
281        // by Database::new_encrypted(), so no injection is possible in the
282        // x'...' hex literal format. Note: pragma_update does NOT use parameter
283        // binding for the PRAGMA value — hex validation is the sole injection defense.
284        let mut pragma_val = format!("x'{key}'");
285        conn.pragma_update(None, "key", &pragma_val)?;
286        pragma_val.zeroize();
287        // Verify the key works by reading from the database
288        conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
289        let mut storage = Self {
290            conn,
291            metadata_cache: RefCell::new(HashMap::new()),
292        };
293        storage.initialize()?;
294        Ok(storage)
295    }
296
297    /// Open an in-memory database (for tests and ephemeral use).
298    pub fn memory() -> Result<Self> {
299        let conn = Connection::open_in_memory()?;
300        let mut storage = Self {
301            conn,
302            metadata_cache: RefCell::new(HashMap::new()),
303        };
304        storage.initialize()?;
305        Ok(storage)
306    }
307
308    /// Initialize the database: WAL mode, metadata tables, config.
309    fn initialize(&mut self) -> Result<()> {
310        // Enable WAL mode for better concurrency
311        self.conn.pragma_update(None, "journal_mode", "WAL")?;
312
313        // Register FNV-1a hash function for parallel scan segment assignment.
314        // Uses get_raw() to borrow directly from SQLite's buffer (zero-copy).
315        self.conn.create_scalar_function(
316            "fnv1a_hash",
317            1,
318            rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
319                | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
320            |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
321                let pk_ref = ctx.get_raw(0);
322                let pk_bytes = match pk_ref {
323                    rusqlite::types::ValueRef::Text(bytes) => bytes,
324                    _ => {
325                        return Err(rusqlite::Error::InvalidFunctionParameterType(
326                            0,
327                            rusqlite::types::Type::Text,
328                        ));
329                    }
330                };
331                let mut hash: u32 = 2166136261;
332                for &byte in pk_bytes {
333                    hash ^= byte as u32;
334                    hash = hash.wrapping_mul(16777619);
335                }
336                Ok(hash as i64)
337            },
338        )?;
339
340        // Create metadata tables
341        self.conn.execute_batch(
342            "CREATE TABLE IF NOT EXISTS _config (
343                key TEXT PRIMARY KEY,
344                value TEXT NOT NULL
345            );
346
347            CREATE TABLE IF NOT EXISTS _tables (
348                table_name TEXT PRIMARY KEY,
349                key_schema TEXT NOT NULL,
350                attribute_definitions TEXT NOT NULL,
351                gsi_definitions TEXT,
352                lsi_definitions TEXT,
353                stream_enabled INTEGER DEFAULT 0,
354                stream_view_type TEXT,
355                stream_label TEXT,
356                ttl_attribute TEXT,
357                ttl_enabled INTEGER DEFAULT 0,
358                created_at INTEGER NOT NULL,
359                table_status TEXT NOT NULL DEFAULT 'ACTIVE',
360                billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
361                provisioned_throughput TEXT,
362                tags TEXT,
363                sse_specification TEXT,
364                table_class TEXT,
365                deletion_protection_enabled INTEGER DEFAULT 0
366            );
367
368            CREATE TABLE IF NOT EXISTS _stream_records (
369                id INTEGER PRIMARY KEY AUTOINCREMENT,
370                table_name TEXT NOT NULL,
371                event_name TEXT NOT NULL,
372                keys_json TEXT NOT NULL,
373                new_image TEXT,
374                old_image TEXT,
375                sequence_number TEXT NOT NULL,
376                shard_id TEXT NOT NULL,
377                created_at INTEGER NOT NULL,
378                user_identity TEXT
379            );",
380        )?;
381
382        // Migrate: add user_identity column if it doesn't exist (for databases created before Phase 11)
383        let _ = self
384            .conn
385            .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
386
387        // Set schema version if not present
388        self.conn.execute(
389            "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
390            params![SCHEMA_VERSION],
391        )?;
392
393        // Run schema migrations
394        let version: i32 = self
395            .conn
396            .query_row(
397                "SELECT value FROM _config WHERE key = 'schema_version'",
398                [],
399                |r| r.get::<_, String>(0),
400            )
401            .unwrap_or_else(|_| "1".to_string())
402            .parse()
403            .unwrap_or(1);
404
405        if version < 2 {
406            self.migrate_v1_to_v2()?;
407        }
408        if version < 3 {
409            self.migrate_v2_to_v3()?;
410        }
411        if version < 4 {
412            self.migrate_v3_to_v4()?;
413        }
414        if version < 5 {
415            self.migrate_v4_to_v5()?;
416        }
417        if version < 6 {
418            self.migrate_v5_to_v6()?;
419        }
420
421        Ok(())
422    }
423
424    /// Migrate from schema v1 to v2: add `cached_at REAL` column to all data tables.
425    fn migrate_v1_to_v2(&self) -> Result<()> {
426        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427        let table_names: Vec<String> = stmt
428            .query_map([], |row| row.get(0))?
429            .collect::<std::result::Result<Vec<_>, _>>()?;
430
431        for table_name in &table_names {
432            let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433            let _ = self.conn.execute(
434                &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435                [],
436            );
437        }
438
439        self.conn.execute(
440            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441            [],
442        )?;
443
444        Ok(())
445    }
446
447    /// Migrate from schema v2 to v3: add `tags TEXT` column to `_tables`.
448    fn migrate_v2_to_v3(&self) -> Result<()> {
449        let _ = self
450            .conn
451            .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453        self.conn.execute(
454            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455            [],
456        )?;
457
458        Ok(())
459    }
460
461    /// Migrate from schema v3 to v4: add SSE, table class, and deletion protection columns.
462    fn migrate_v3_to_v4(&self) -> Result<()> {
463        let _ = self
464            .conn
465            .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466        let _ = self
467            .conn
468            .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469        let _ = self.conn.execute(
470            "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471            [],
472        );
473
474        self.conn.execute(
475            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476            [],
477        )?;
478
479        Ok(())
480    }
481
482    /// Migrate from schema v4 to v5: add secondary indexes on GSI and LSI base-key columns.
483    fn migrate_v4_to_v5(&self) -> Result<()> {
484        let mut stmt = self
485            .conn
486            .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487        let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489            .collect::<std::result::Result<Vec<_>, _>>()?;
490
491        for (table_name, gsi_json, lsi_json) in &tables {
492            if let Some(json) = gsi_json {
493                if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494                    for gsi in &gsis {
495                        if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496                            let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497                            let idx_name =
498                                escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499                            let _ = self.conn.execute_batch(&format!(
500                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501                            ));
502                        }
503                    }
504                }
505            }
506            if let Some(json) = lsi_json {
507                if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508                    for lsi in &lsis {
509                        if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510                            let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511                            let idx_name =
512                                escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513                            let _ = self.conn.execute_batch(&format!(
514                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515                            ));
516                        }
517                    }
518                }
519            }
520        }
521
522        self.conn.execute(
523            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524            [],
525        )?;
526
527        Ok(())
528    }
529
530    /// Migrate from schema v5 to v6: add `hash_prefix TEXT` column to all data tables.
531    fn migrate_v5_to_v6(&self) -> Result<()> {
532        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533        let table_names: Vec<String> = stmt
534            .query_map([], |row| row.get(0))?
535            .collect::<std::result::Result<Vec<_>, _>>()?;
536
537        for table_name in &table_names {
538            let escaped = escape_table_name(table_name);
539            let _ = self.conn.execute(
540                &format!(
541                    "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542                ),
543                [],
544            );
545        }
546
547        self.conn.execute(
548            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549            [],
550        )?;
551
552        Ok(())
553    }
554
555    /// Get a reference to the underlying connection (for transactions, etc.).
556    pub fn conn(&self) -> &Connection {
557        &self.conn
558    }
559
560    /// Get a mutable reference to the underlying connection.
561    pub fn conn_mut(&mut self) -> &mut Connection {
562        &mut self.conn
563    }
564
565    // -----------------------------------------------------------------------
566    // Table metadata
567    // -----------------------------------------------------------------------
568
569    /// Insert a row into the `_tables` metadata table.
570    pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
571        let table_name = m.table_name;
572        self.conn.execute(
573            "INSERT INTO _tables (table_name, key_schema, attribute_definitions, gsi_definitions, \
574             lsi_definitions, provisioned_throughput, created_at, sse_specification, table_class, \
575             deletion_protection_enabled, billing_mode)
576             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
577            params![
578                m.table_name,
579                m.key_schema,
580                m.attribute_definitions,
581                m.gsi_definitions,
582                m.lsi_definitions,
583                m.provisioned_throughput,
584                m.created_at,
585                m.sse_specification,
586                m.table_class,
587                m.deletion_protection_enabled as i32,
588                m.billing_mode,
589            ],
590        )?;
591        self.metadata_cache.borrow_mut().remove(table_name);
592        Ok(())
593    }
594
595    /// Get metadata for a table. Returns None if the table doesn't exist.
596    ///
597    /// Results are cached in memory. The cache is invalidated when metadata
598    /// is modified via `insert_table_metadata`, `delete_table_metadata`,
599    /// `enable_stream`, or `update_ttl_config`.
600    pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
601        // Check cache first
602        if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
603            return Ok(Some(cached.clone()));
604        }
605
606        let sql = format!("SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE table_name = ?1");
607        let mut stmt = self.conn.prepare(&sql)?;
608
609        let result = stmt.query_row(params![table_name], row_to_metadata);
610
611        match result {
612            Ok(meta) => {
613                self.metadata_cache
614                    .borrow_mut()
615                    .insert(table_name.to_string(), meta.clone());
616                Ok(Some(meta))
617            }
618            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
619            Err(e) => Err(DynoxideError::from(e)),
620        }
621    }
622
623    /// Delete metadata for a table.
624    pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
625        let affected = self.conn.execute(
626            "DELETE FROM _tables WHERE table_name = ?1",
627            params![table_name],
628        )?;
629        self.metadata_cache.borrow_mut().remove(table_name);
630        Ok(affected > 0)
631    }
632
633    /// Update attribute definitions and GSI definitions for a table.
634    pub fn update_table_metadata(
635        &self,
636        table_name: &str,
637        attribute_definitions: &str,
638        gsi_definitions: Option<&str>,
639    ) -> Result<()> {
640        self.conn.execute(
641            "UPDATE _tables SET attribute_definitions = ?1, gsi_definitions = ?2 WHERE table_name = ?3",
642            params![attribute_definitions, gsi_definitions, table_name],
643        )?;
644        self.metadata_cache.borrow_mut().remove(table_name);
645        Ok(())
646    }
647
648    /// Update provisioned throughput for a table.
649    pub fn update_provisioned_throughput(
650        &self,
651        table_name: &str,
652        provisioned_throughput: &str,
653    ) -> Result<()> {
654        self.conn.execute(
655            "UPDATE _tables SET provisioned_throughput = ?1 WHERE table_name = ?2",
656            params![provisioned_throughput, table_name],
657        )?;
658        self.metadata_cache.borrow_mut().remove(table_name);
659        Ok(())
660    }
661
662    /// Clear provisioned throughput for a table (sets to SQL NULL).
663    pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
664        self.conn.execute(
665            "UPDATE _tables SET provisioned_throughput = NULL WHERE table_name = ?1",
666            params![table_name],
667        )?;
668        self.metadata_cache.borrow_mut().remove(table_name);
669        Ok(())
670    }
671
672    /// Update billing mode for a table.
673    pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
674        self.conn.execute(
675            "UPDATE _tables SET billing_mode = ?1 WHERE table_name = ?2",
676            params![billing_mode, table_name],
677        )?;
678        self.metadata_cache.borrow_mut().remove(table_name);
679        Ok(())
680    }
681
682    // -----------------------------------------------------------------------
683    // Tag operations
684    // -----------------------------------------------------------------------
685
686    /// Get tags for a table.
687    pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
688        let tags_json: Option<String> = self.conn.query_row(
689            "SELECT tags FROM _tables WHERE table_name = ?1",
690            params![table_name],
691            |row| row.get(0),
692        )?;
693
694        match tags_json {
695            Some(json) => serde_json::from_str(&json)
696                .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
697            None => Ok(Vec::new()),
698        }
699    }
700
701    /// Set (merge) tags on a table. New keys overwrite existing keys.
702    pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
703        use std::collections::BTreeMap;
704
705        let existing = self.get_tags(table_name)?;
706        let mut tag_map: BTreeMap<String, String> =
707            existing.into_iter().map(|t| (t.key, t.value)).collect();
708
709        for tag in new_tags {
710            tag_map.insert(tag.key.clone(), tag.value.clone());
711        }
712
713        if tag_map.len() > 50 {
714            return Err(DynoxideError::ValidationException(
715                "One or more parameter values were invalid: \
716                 Too many tags: tag limit is 50"
717                    .to_string(),
718            ));
719        }
720
721        let merged: Vec<crate::types::Tag> = tag_map
722            .into_iter()
723            .map(|(k, v)| crate::types::Tag { key: k, value: v })
724            .collect();
725
726        let json = serde_json::to_string(&merged)
727            .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
728
729        self.conn.execute(
730            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
731            params![json, table_name],
732        )?;
733        Ok(())
734    }
735
736    /// Update the deletion protection setting for a table.
737    pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
738        self.conn.execute(
739            "UPDATE _tables SET deletion_protection_enabled = ?1 WHERE table_name = ?2",
740            params![enabled as i32, table_name],
741        )?;
742        self.metadata_cache.borrow_mut().remove(table_name);
743        Ok(())
744    }
745
746    /// Remove tags by key from a table.
747    pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
748        let mut tags = self.get_tags(table_name)?;
749        tags.retain(|t| !keys.contains(&t.key));
750
751        let json = if tags.is_empty() {
752            None
753        } else {
754            Some(
755                serde_json::to_string(&tags)
756                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
757            )
758        };
759
760        self.conn.execute(
761            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
762            params![json, table_name],
763        )?;
764        Ok(())
765    }
766
767    /// List all table names.
768    pub fn list_table_names(&self) -> Result<Vec<String>> {
769        let mut stmt = self
770            .conn
771            .prepare("SELECT table_name FROM _tables ORDER BY table_name")?;
772        let names = stmt
773            .query_map([], |row| row.get(0))?
774            .collect::<std::result::Result<Vec<String>, _>>()?;
775        Ok(names)
776    }
777
778    /// Check if a table exists in metadata.
779    pub fn table_exists(&self, table_name: &str) -> Result<bool> {
780        let count: i32 = self.conn.query_row(
781            "SELECT COUNT(*) FROM _tables WHERE table_name = ?1",
782            params![table_name],
783            |row| row.get(0),
784        )?;
785        Ok(count > 0)
786    }
787
788    /// Invalidate the cached metadata for a specific table.
789    #[allow(dead_code)]
790    pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
791        self.metadata_cache.borrow_mut().remove(table_name);
792    }
793
794    // -----------------------------------------------------------------------
795    // Dynamic data tables
796    // -----------------------------------------------------------------------
797
798    /// Create a data table for a DynamoDB table.
799    pub fn create_data_table(&self, table_name: &str) -> Result<()> {
800        let sql = format!(
801            "CREATE TABLE \"{}\" (
802                pk TEXT NOT NULL,
803                sk TEXT NOT NULL DEFAULT '',
804                item_json TEXT NOT NULL,
805                item_size INTEGER NOT NULL,
806                cached_at REAL,
807                hash_prefix TEXT NOT NULL DEFAULT '',
808                PRIMARY KEY (pk, sk)
809            )",
810            escape_table_name(table_name)
811        );
812        self.conn.execute(&sql, [])?;
813        Ok(())
814    }
815
816    /// Drop a data table.
817    pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
818        let sql = format!("DROP TABLE IF EXISTS \"{}\"", escape_table_name(table_name));
819        self.conn.execute(&sql, [])?;
820        Ok(())
821    }
822
823    /// Create a GSI table.
824    pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
825        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
826        let escaped = escape_table_name(&gsi_table_name);
827        let sql = format!(
828            "CREATE TABLE \"{escaped}\" (
829                gsi_pk TEXT NOT NULL,
830                gsi_sk TEXT NOT NULL DEFAULT '',
831                table_pk TEXT NOT NULL,
832                table_sk TEXT NOT NULL DEFAULT '',
833                item_json TEXT NOT NULL,
834                PRIMARY KEY (gsi_pk, gsi_sk, table_pk, table_sk)
835            )"
836        );
837        self.conn.execute(&sql, [])?;
838
839        let idx_name = escape_table_name(&format!("{gsi_table_name}::base_key"));
840        self.conn.execute_batch(&format!(
841            "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (table_pk, table_sk)"
842        ))?;
843        Ok(())
844    }
845
846    /// Drop a GSI table.
847    pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
848        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
849        let sql = format!(
850            "DROP TABLE IF EXISTS \"{}\"",
851            escape_table_name(&gsi_table_name)
852        );
853        self.conn.execute(&sql, [])?;
854        Ok(())
855    }
856
857    // -----------------------------------------------------------------------
858    // GSI item operations
859    // -----------------------------------------------------------------------
860
861    /// Insert an item into a GSI table.
862    #[allow(clippy::too_many_arguments)]
863    pub fn insert_gsi_item(
864        &self,
865        table_name: &str,
866        index_name: &str,
867        gsi_pk: &str,
868        gsi_sk: &str,
869        table_pk: &str,
870        table_sk: &str,
871        item_json: &str,
872    ) -> Result<()> {
873        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
874        let sql = format!(
875            "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
876            escape_table_name(&gsi_table_name)
877        );
878        self.conn
879            .prepare_cached(&sql)?
880            .execute(params![gsi_pk, gsi_sk, table_pk, table_sk, item_json])?;
881        Ok(())
882    }
883
884    /// Delete an item from a GSI table by base table primary key.
885    pub fn delete_gsi_item(
886        &self,
887        table_name: &str,
888        index_name: &str,
889        table_pk: &str,
890        table_sk: &str,
891    ) -> Result<()> {
892        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
893        let sql = format!(
894            "DELETE FROM \"{}\" WHERE table_pk = ?1 AND table_sk = ?2",
895            escape_table_name(&gsi_table_name)
896        );
897        self.conn
898            .prepare_cached(&sql)?
899            .execute(params![table_pk, table_sk])?;
900        Ok(())
901    }
902
903    /// Query items from a GSI table.
904    pub fn query_gsi_items(
905        &self,
906        table_name: &str,
907        index_name: &str,
908        gsi_pk: &str,
909        params: &QueryParams,
910    ) -> Result<Vec<(String, String, String)>> {
911        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
912        let mut sql = format!(
913            "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\" WHERE gsi_pk = ?1",
914            escape_table_name(&gsi_table_name)
915        );
916
917        let mut param_idx = 2;
918        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> =
919            vec![Box::new(gsi_pk.to_string())];
920
921        if let Some(cond) = params.sk_condition {
922            // Replace "sk" with "gsi_sk" in the condition.
923            // N.B. This string replacement works because all SQL fragments generated
924            // by key_condition use the form ` sk ` or ` sk>` — they never embed
925            // `sk` without surrounding whitespace/operator. A more robust solution
926            // would thread the column name through the key condition builder, but
927            // that is a larger refactor deferred for now.
928            let gsi_cond = cond.replace(" sk ", " gsi_sk ").replace(" sk>", " gsi_sk>");
929            sql.push(' ');
930            sql.push_str(&gsi_cond);
931            for &p in params.sk_params {
932                all_params.push(Box::new(p.to_string()));
933                param_idx += 1;
934            }
935        }
936
937        // For GSI pagination, use a composite cursor (gsi_sk, table_pk, table_sk)
938        // so that hash-only GSIs (where gsi_sk is always '') paginate correctly.
939        if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
940            params.exclusive_start_sk,
941            params.exclusive_start_base_pk,
942            params.exclusive_start_base_sk,
943        ) {
944            let op = if params.forward { ">" } else { "<" };
945            sql.push_str(&format!(
946                " AND (gsi_sk, table_pk, table_sk) {op} (?{}, ?{}, ?{})",
947                param_idx,
948                param_idx + 1,
949                param_idx + 2
950            ));
951            all_params.push(Box::new(start_sk.to_string()));
952            all_params.push(Box::new(start_base_pk.to_string()));
953            all_params.push(Box::new(start_base_sk.to_string()));
954        } else if let Some(start_sk) = params.exclusive_start_sk {
955            if params.forward {
956                sql.push_str(&format!(" AND gsi_sk > ?{param_idx}"));
957            } else {
958                sql.push_str(&format!(" AND gsi_sk < ?{param_idx}"));
959            }
960            all_params.push(Box::new(start_sk.to_string()));
961        }
962
963        sql.push_str(if params.forward {
964            " ORDER BY gsi_sk ASC, table_pk ASC, table_sk ASC"
965        } else {
966            " ORDER BY gsi_sk DESC, table_pk DESC, table_sk DESC"
967        });
968
969        if let Some(lim) = params.limit {
970            sql.push_str(&format!(" LIMIT {lim}"));
971        }
972
973        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
974            all_params.iter().map(|p| p.as_ref()).collect();
975        let mut stmt = self.conn.prepare(&sql)?;
976        let rows = stmt
977            .query_map(param_refs.as_slice(), |row| {
978                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
979            })?
980            .collect::<std::result::Result<Vec<_>, _>>()?;
981
982        Ok(rows)
983    }
984
985    /// Scan all items from a GSI table.
986    pub fn scan_gsi_items(
987        &self,
988        table_name: &str,
989        index_name: &str,
990        params: &ScanParams,
991    ) -> Result<Vec<(String, String, String)>> {
992        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
993        let mut sql = format!(
994            "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\"",
995            escape_table_name(&gsi_table_name)
996        );
997
998        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
999        let mut where_clauses = Vec::new();
1000        let mut param_idx = 1;
1001
1002        if let (Some(start_pk), Some(start_sk)) =
1003            (params.exclusive_start_pk, params.exclusive_start_sk)
1004        {
1005            // The GSI table's primary key is (gsi_pk, gsi_sk, table_pk, table_sk).
1006            // Using only (gsi_pk, gsi_sk) for the cursor skips rows that share the
1007            // same GSI key but have different base table keys.
1008            if let (Some(base_pk), Some(base_sk)) = (
1009                params.exclusive_start_base_pk,
1010                params.exclusive_start_base_sk,
1011            ) {
1012                where_clauses.push(format!(
1013                    "(gsi_pk, gsi_sk, table_pk, table_sk) > (?{}, ?{}, ?{}, ?{})",
1014                    param_idx,
1015                    param_idx + 1,
1016                    param_idx + 2,
1017                    param_idx + 3
1018                ));
1019                params_vec.push(Box::new(start_pk.to_string()));
1020                params_vec.push(Box::new(start_sk.to_string()));
1021                params_vec.push(Box::new(base_pk.to_string()));
1022                params_vec.push(Box::new(base_sk.to_string()));
1023                param_idx += 4;
1024            } else {
1025                where_clauses.push(format!(
1026                    "(gsi_pk, gsi_sk) > (?{}, ?{})",
1027                    param_idx,
1028                    param_idx + 1
1029                ));
1030                params_vec.push(Box::new(start_pk.to_string()));
1031                params_vec.push(Box::new(start_sk.to_string()));
1032                param_idx += 2;
1033            }
1034        }
1035
1036        // For GSI parallel scan, hash on the base table pk (table_pk column)
1037        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1038            where_clauses.push(format!(
1039                "(fnv1a_hash(table_pk) % ?{}) = ?{}",
1040                param_idx,
1041                param_idx + 1
1042            ));
1043            params_vec.push(Box::new(total as i64));
1044            params_vec.push(Box::new(seg as i64));
1045        }
1046
1047        if !where_clauses.is_empty() {
1048            sql.push_str(" WHERE ");
1049            sql.push_str(&where_clauses.join(" AND "));
1050        }
1051
1052        sql.push_str(" ORDER BY gsi_pk ASC, gsi_sk ASC, table_pk ASC, table_sk ASC");
1053
1054        if let Some(lim) = params.limit {
1055            sql.push_str(&format!(" LIMIT {lim}"));
1056        }
1057
1058        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1059            params_vec.iter().map(|p| p.as_ref()).collect();
1060        let mut stmt = self.conn.prepare(&sql)?;
1061        let rows = stmt
1062            .query_map(param_refs.as_slice(), |row| {
1063                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1064            })?
1065            .collect::<std::result::Result<Vec<_>, _>>()?;
1066
1067        Ok(rows)
1068    }
1069
1070    // -----------------------------------------------------------------------
1071    // LSI table operations
1072    // -----------------------------------------------------------------------
1073
1074    /// Create an LSI table for a given base table and index name.
1075    pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1076        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1077        let escaped = escape_table_name(&lsi_table_name);
1078        let sql = format!(
1079            "CREATE TABLE \"{escaped}\" (
1080                pk TEXT NOT NULL,
1081                sk TEXT NOT NULL DEFAULT '',
1082                base_pk TEXT NOT NULL,
1083                base_sk TEXT NOT NULL DEFAULT '',
1084                item_json TEXT NOT NULL,
1085                PRIMARY KEY (pk, sk, base_pk, base_sk)
1086            )"
1087        );
1088        self.conn.execute(&sql, [])?;
1089
1090        let idx_name = escape_table_name(&format!("{lsi_table_name}::base_key"));
1091        self.conn.execute_batch(&format!(
1092            "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (base_pk, base_sk)"
1093        ))?;
1094        Ok(())
1095    }
1096
1097    /// Drop an LSI table.
1098    pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1099        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1100        let sql = format!(
1101            "DROP TABLE IF EXISTS \"{}\"",
1102            escape_table_name(&lsi_table_name)
1103        );
1104        self.conn.execute(&sql, [])?;
1105        Ok(())
1106    }
1107
1108    // -----------------------------------------------------------------------
1109    // LSI item operations
1110    // -----------------------------------------------------------------------
1111
1112    /// Insert an item into an LSI table.
1113    #[allow(clippy::too_many_arguments)]
1114    pub fn insert_lsi_item(
1115        &self,
1116        table_name: &str,
1117        index_name: &str,
1118        pk: &str,
1119        sk: &str,
1120        base_pk: &str,
1121        base_sk: &str,
1122        item_json: &str,
1123    ) -> Result<()> {
1124        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1125        let sql = format!(
1126            "INSERT OR REPLACE INTO \"{}\" (pk, sk, base_pk, base_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
1127            escape_table_name(&lsi_table_name)
1128        );
1129        self.conn
1130            .prepare_cached(&sql)?
1131            .execute(params![pk, sk, base_pk, base_sk, item_json])?;
1132        Ok(())
1133    }
1134
1135    /// Delete an item from an LSI table by base table primary key.
1136    pub fn delete_lsi_item(
1137        &self,
1138        table_name: &str,
1139        index_name: &str,
1140        base_pk: &str,
1141        base_sk: &str,
1142    ) -> Result<()> {
1143        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1144        let sql = format!(
1145            "DELETE FROM \"{}\" WHERE base_pk = ?1 AND base_sk = ?2",
1146            escape_table_name(&lsi_table_name)
1147        );
1148        self.conn
1149            .prepare_cached(&sql)?
1150            .execute(params![base_pk, base_sk])?;
1151        Ok(())
1152    }
1153
1154    /// Query items from an LSI table.
1155    pub fn query_lsi_items(
1156        &self,
1157        table_name: &str,
1158        index_name: &str,
1159        pk: &str,
1160        params: &QueryParams,
1161    ) -> Result<Vec<(String, String, String)>> {
1162        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1163        let mut sql = format!(
1164            "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1165            escape_table_name(&lsi_table_name)
1166        );
1167
1168        let mut param_idx = 2;
1169        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1170
1171        if let Some(cond) = params.sk_condition {
1172            sql.push(' ');
1173            sql.push_str(cond);
1174            for &p in params.sk_params {
1175                all_params.push(Box::new(p.to_string()));
1176                param_idx += 1;
1177            }
1178        }
1179
1180        // Use composite cursor when all three LSI pagination values are present,
1181        // otherwise fall back to simple sk comparison.
1182        if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1183            params.exclusive_start_sk,
1184            params.exclusive_start_base_pk,
1185            params.exclusive_start_base_sk,
1186        ) {
1187            let op = if params.forward { ">" } else { "<" };
1188            sql.push_str(&format!(
1189                " AND (sk, base_pk, base_sk) {op} (?{}, ?{}, ?{})",
1190                param_idx,
1191                param_idx + 1,
1192                param_idx + 2
1193            ));
1194            all_params.push(Box::new(start_sk.to_string()));
1195            all_params.push(Box::new(start_base_pk.to_string()));
1196            all_params.push(Box::new(start_base_sk.to_string()));
1197        } else if let Some(start_sk) = params.exclusive_start_sk {
1198            if params.forward {
1199                sql.push_str(&format!(" AND sk > ?{param_idx}"));
1200            } else {
1201                sql.push_str(&format!(" AND sk < ?{param_idx}"));
1202            }
1203            all_params.push(Box::new(start_sk.to_string()));
1204        }
1205
1206        sql.push_str(if params.forward {
1207            " ORDER BY sk ASC, base_pk ASC, base_sk ASC"
1208        } else {
1209            " ORDER BY sk DESC, base_pk DESC, base_sk DESC"
1210        });
1211
1212        if let Some(lim) = params.limit {
1213            sql.push_str(&format!(" LIMIT {lim}"));
1214        }
1215
1216        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1217            all_params.iter().map(|p| p.as_ref()).collect();
1218        let mut stmt = self.conn.prepare(&sql)?;
1219        let rows = stmt
1220            .query_map(param_refs.as_slice(), |row| {
1221                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1222            })?
1223            .collect::<std::result::Result<Vec<_>, _>>()?;
1224
1225        Ok(rows)
1226    }
1227
1228    /// Scan all items from an LSI table.
1229    pub fn scan_lsi_items(
1230        &self,
1231        table_name: &str,
1232        index_name: &str,
1233        params: &ScanParams,
1234    ) -> Result<Vec<(String, String, String)>> {
1235        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1236        let mut sql = format!(
1237            "SELECT pk, sk, item_json FROM \"{}\"",
1238            escape_table_name(&lsi_table_name)
1239        );
1240
1241        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1242        let mut where_clauses = Vec::new();
1243        let mut param_idx = 1;
1244
1245        // Use composite cursor when base key values are present for correct LSI pagination.
1246        if let (Some(start_pk), Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1247            params.exclusive_start_pk,
1248            params.exclusive_start_sk,
1249            params.exclusive_start_base_pk,
1250            params.exclusive_start_base_sk,
1251        ) {
1252            where_clauses.push(format!(
1253                "(pk, sk, base_pk, base_sk) > (?{}, ?{}, ?{}, ?{})",
1254                param_idx,
1255                param_idx + 1,
1256                param_idx + 2,
1257                param_idx + 3
1258            ));
1259            params_vec.push(Box::new(start_pk.to_string()));
1260            params_vec.push(Box::new(start_sk.to_string()));
1261            params_vec.push(Box::new(start_base_pk.to_string()));
1262            params_vec.push(Box::new(start_base_sk.to_string()));
1263            param_idx += 4;
1264        } else if let (Some(start_pk), Some(start_sk)) =
1265            (params.exclusive_start_pk, params.exclusive_start_sk)
1266        {
1267            where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1268            params_vec.push(Box::new(start_pk.to_string()));
1269            params_vec.push(Box::new(start_sk.to_string()));
1270            param_idx += 2;
1271        }
1272
1273        // For LSI parallel scan, hash on the base table pk (base_pk column)
1274        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1275            where_clauses.push(format!(
1276                "(fnv1a_hash(base_pk) % ?{}) = ?{}",
1277                param_idx,
1278                param_idx + 1
1279            ));
1280            params_vec.push(Box::new(total as i64));
1281            params_vec.push(Box::new(seg as i64));
1282        }
1283
1284        if !where_clauses.is_empty() {
1285            sql.push_str(" WHERE ");
1286            sql.push_str(&where_clauses.join(" AND "));
1287        }
1288
1289        sql.push_str(" ORDER BY pk ASC, sk ASC");
1290
1291        if let Some(lim) = params.limit {
1292            sql.push_str(&format!(" LIMIT {lim}"));
1293        }
1294
1295        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1296            params_vec.iter().map(|p| p.as_ref()).collect();
1297        let mut stmt = self.conn.prepare(&sql)?;
1298        let rows = stmt
1299            .query_map(param_refs.as_slice(), |row| {
1300                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1301            })?
1302            .collect::<std::result::Result<Vec<_>, _>>()?;
1303
1304        Ok(rows)
1305    }
1306
1307    // -----------------------------------------------------------------------
1308    // Transaction support
1309    // -----------------------------------------------------------------------
1310
1311    /// Begin an immediate SQLite transaction.
1312    pub fn begin_transaction(&self) -> Result<()> {
1313        self.conn.execute_batch("BEGIN IMMEDIATE")?;
1314        Ok(())
1315    }
1316
1317    /// Commit the current transaction.
1318    pub fn commit(&self) -> Result<()> {
1319        self.conn.execute_batch("COMMIT")?;
1320        Ok(())
1321    }
1322
1323    /// Rollback the current transaction.
1324    pub fn rollback(&self) -> Result<()> {
1325        self.conn.execute_batch("ROLLBACK")?;
1326        Ok(())
1327    }
1328
1329    /// Set aggressive PRAGMAs for bulk loading.
1330    ///
1331    /// Disables fsync, increases cache, and enables memory-mapped I/O.
1332    /// Only safe when data loss on crash is acceptable (e.g., fresh import
1333    /// that can be re-run).
1334    pub fn enable_bulk_loading(&self) -> Result<()> {
1335        self.conn.execute_batch(
1336            "PRAGMA synchronous = OFF;
1337             PRAGMA cache_size = -64000;
1338             PRAGMA temp_store = MEMORY;
1339             PRAGMA mmap_size = 268435456;",
1340        )?;
1341        Ok(())
1342    }
1343
1344    /// Restore normal PRAGMAs after bulk loading.
1345    pub fn disable_bulk_loading(&self) -> Result<()> {
1346        self.conn.execute_batch(
1347            "PRAGMA synchronous = NORMAL;
1348             PRAGMA cache_size = -2000;
1349             PRAGMA temp_store = DEFAULT;
1350             PRAGMA mmap_size = 0;",
1351        )?;
1352        Ok(())
1353    }
1354
1355    // -----------------------------------------------------------------------
1356    // Item CRUD
1357    // -----------------------------------------------------------------------
1358
1359    /// Insert or replace an item.
1360    pub fn put_item(
1361        &self,
1362        table_name: &str,
1363        pk: &str,
1364        sk: &str,
1365        item_json: &str,
1366        item_size: usize,
1367    ) -> Result<Option<String>> {
1368        self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1369    }
1370
1371    /// Put an item with an explicit hash prefix for parallel scan ordering.
1372    pub fn put_item_with_hash(
1373        &self,
1374        table_name: &str,
1375        pk: &str,
1376        sk: &str,
1377        item_json: &str,
1378        item_size: usize,
1379        hash_prefix: &str,
1380    ) -> Result<Option<String>> {
1381        // First, try to get the old item for return value
1382        let old_item = self.get_item(table_name, pk, sk)?;
1383
1384        let escaped = escape_table_name(table_name);
1385        let sql = format!(
1386            "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1387             VALUES (?1, ?2, ?3, ?4, \
1388             (SELECT cached_at FROM \"{escaped}\" WHERE pk = ?1 AND sk = ?2), ?5)"
1389        );
1390        self.conn.execute(
1391            &sql,
1392            params![pk, sk, item_json, item_size as i64, hash_prefix],
1393        )?;
1394
1395        Ok(old_item)
1396    }
1397
1398    /// Get a single item by primary key.
1399    pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1400        let sql = format!(
1401            "SELECT item_json FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1402            escape_table_name(table_name)
1403        );
1404        let result = self.conn.query_row(&sql, params![pk, sk], |row| row.get(0));
1405
1406        match result {
1407            Ok(json) => Ok(Some(json)),
1408            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1409            Err(e) => Err(DynoxideError::from(e)),
1410        }
1411    }
1412
1413    /// Return the total item_size for all items sharing the given partition key.
1414    pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1415        let sql = format!(
1416            "SELECT COALESCE(SUM(item_size), 0) FROM \"{}\" WHERE pk = ?1",
1417            escape_table_name(table_name)
1418        );
1419        let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1420        Ok(size)
1421    }
1422
1423    /// Return the total size of LSI items for a given partition key.
1424    /// LSI items are stored as JSON text, so we use length(item_json).
1425    pub fn get_lsi_partition_size(
1426        &self,
1427        table_name: &str,
1428        index_name: &str,
1429        pk: &str,
1430    ) -> Result<i64> {
1431        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1432        let sql = format!(
1433            "SELECT COALESCE(SUM(length(item_json)), 0) FROM \"{}\" WHERE pk = ?1",
1434            escape_table_name(&lsi_table_name)
1435        );
1436        let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1437        Ok(size)
1438    }
1439
1440    /// Delete an item by primary key. Returns the old item_json if it existed.
1441    pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1442        let old_item = self.get_item(table_name, pk, sk)?;
1443
1444        let sql = format!(
1445            "DELETE FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1446            escape_table_name(table_name)
1447        );
1448        self.conn.execute(&sql, params![pk, sk])?;
1449
1450        Ok(old_item)
1451    }
1452
1453    /// Query items by partition key with optional sort key condition.
1454    ///
1455    /// `sk_condition` is a SQL fragment like `AND sk > ?` with `sk_params` providing values.
1456    /// Returns `(items, has_more)` where items is a vec of `(pk, sk, item_json)`.
1457    pub fn query_items(
1458        &self,
1459        table_name: &str,
1460        pk: &str,
1461        params: &QueryParams,
1462    ) -> Result<Vec<(String, String, String)>> {
1463        let mut sql = format!(
1464            "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1465            escape_table_name(table_name)
1466        );
1467
1468        let mut param_idx = 2;
1469        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1470
1471        if let Some(cond) = params.sk_condition {
1472            sql.push(' ');
1473            sql.push_str(cond);
1474            for &p in params.sk_params {
1475                all_params.push(Box::new(p.to_string()));
1476                param_idx += 1;
1477            }
1478        }
1479
1480        if let Some(start_sk) = params.exclusive_start_sk {
1481            if params.forward {
1482                sql.push_str(&format!(" AND sk > ?{param_idx}"));
1483            } else {
1484                sql.push_str(&format!(" AND sk < ?{param_idx}"));
1485            }
1486            all_params.push(Box::new(start_sk.to_string()));
1487        }
1488
1489        sql.push_str(if params.forward {
1490            " ORDER BY sk ASC"
1491        } else {
1492            " ORDER BY sk DESC"
1493        });
1494
1495        if let Some(lim) = params.limit {
1496            sql.push_str(&format!(" LIMIT {lim}"));
1497        }
1498
1499        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1500            all_params.iter().map(|p| p.as_ref()).collect();
1501        let mut stmt = self.conn.prepare(&sql)?;
1502        let rows = stmt
1503            .query_map(param_refs.as_slice(), |row| {
1504                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1505            })?
1506            .collect::<std::result::Result<Vec<_>, _>>()?;
1507
1508        Ok(rows)
1509    }
1510
1511    /// Scan items from a table with pagination.
1512    ///
1513    /// Returns `(pk, sk, item_json)` tuples ordered by hash_prefix for
1514    /// dynalite-compatible parallel scan behaviour.
1515    pub fn scan_items(
1516        &self,
1517        table_name: &str,
1518        params: &ScanParams,
1519    ) -> Result<Vec<(String, String, String)>> {
1520        let mut sql = format!(
1521            "SELECT pk, sk, item_json FROM \"{}\"",
1522            escape_table_name(table_name)
1523        );
1524
1525        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1526        let mut where_clauses = Vec::new();
1527        let mut param_idx = 1;
1528
1529        // For parallel scan with hash_prefix-based segment filtering
1530        let is_parallel = params.segment.is_some() && params.total_segments.is_some();
1531
1532        if let (Some(start_pk), Some(start_sk)) =
1533            (params.exclusive_start_pk, params.exclusive_start_sk)
1534        {
1535            if is_parallel {
1536                // For parallel scans, pagination must respect hash_prefix ordering
1537                where_clauses.push(format!(
1538                    "(hash_prefix, pk, sk) > ((SELECT hash_prefix FROM \"{}\" WHERE pk = ?{} AND sk = ?{} LIMIT 1), ?{}, ?{})",
1539                    escape_table_name(table_name),
1540                    param_idx, param_idx + 1,
1541                    param_idx, param_idx + 1
1542                ));
1543            } else {
1544                where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1545            }
1546            params_vec.push(Box::new(start_pk.to_string()));
1547            params_vec.push(Box::new(start_sk.to_string()));
1548            param_idx += 2;
1549        }
1550
1551        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1552            // Use hash_prefix column for segment assignment.
1553            // Bucket = parseInt(hash_prefix[0..3], 16).
1554            // Segment owns buckets from ceil(4096*seg/total) to ceil(4096*(seg+1)/total)-1.
1555            let start_bucket = ceiling_div(HASH_BUCKETS * seg, total);
1556            let end_bucket = ceiling_div(HASH_BUCKETS * (seg + 1), total) - 1;
1557            let start_hex = format!("{:03x}", start_bucket);
1558            let end_hex = format!("{:03x}", end_bucket);
1559            // hash_prefix is a 6-char hex string; compare the first 3 chars
1560            where_clauses.push(format!(
1561                "substr(hash_prefix, 1, 3) >= ?{} AND substr(hash_prefix, 1, 3) <= ?{}",
1562                param_idx,
1563                param_idx + 1
1564            ));
1565            params_vec.push(Box::new(start_hex));
1566            params_vec.push(Box::new(end_hex));
1567        }
1568
1569        if !where_clauses.is_empty() {
1570            sql.push_str(" WHERE ");
1571            sql.push_str(&where_clauses.join(" AND "));
1572        }
1573
1574        // For parallel scans, order by hash_prefix for dynalite-compatible behaviour.
1575        // For regular scans, use pk/sk ordering.
1576        if is_parallel {
1577            sql.push_str(" ORDER BY hash_prefix ASC, pk ASC, sk ASC");
1578        } else {
1579            sql.push_str(" ORDER BY pk ASC, sk ASC");
1580        }
1581
1582        if let Some(lim) = params.limit {
1583            sql.push_str(&format!(" LIMIT {lim}"));
1584        }
1585
1586        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1587            params_vec.iter().map(|p| p.as_ref()).collect();
1588        let mut stmt = self.conn.prepare(&sql)?;
1589        let rows = stmt
1590            .query_map(param_refs.as_slice(), |row| {
1591                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1592            })?
1593            .collect::<std::result::Result<Vec<_>, _>>()?;
1594
1595        Ok(rows)
1596    }
1597
1598    /// Count items in a table.
1599    pub fn count_items(&self, table_name: &str) -> Result<i64> {
1600        let sql = format!("SELECT COUNT(*) FROM \"{}\"", escape_table_name(table_name));
1601        let count: i64 = self.conn.query_row(&sql, [], |row| row.get(0))?;
1602        Ok(count)
1603    }
1604
1605    // -----------------------------------------------------------------------
1606    // Introspection
1607    // -----------------------------------------------------------------------
1608
1609    /// Get the database file path, or `None` for in-memory databases.
1610    pub fn db_path(&self) -> Option<String> {
1611        self.conn
1612            .path()
1613            .filter(|p| !p.is_empty())
1614            .map(|p| p.to_owned())
1615    }
1616
1617    /// Get the total database size in bytes.
1618    pub fn db_size_bytes(&self) -> Result<u64> {
1619        let size: i64 = self.conn.query_row(
1620            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1621            [],
1622            |row| row.get(0),
1623        )?;
1624        Ok(size as u64)
1625    }
1626
1627    /// Count the number of DynamoDB tables.
1628    pub fn table_count(&self) -> Result<usize> {
1629        let count: i64 = self
1630            .conn
1631            .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1632        Ok(count as usize)
1633    }
1634
1635    /// Get per-table statistics: name, item count, and approximate size in bytes.
1636    ///
1637    /// Uses a single query per table (COUNT + SUM combined) instead of separate queries.
1638    pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1639        let table_names = self.list_table_names()?;
1640        let mut stats = Vec::with_capacity(table_names.len());
1641        for name in table_names {
1642            let sql = format!(
1643                "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1644                escape_table_name(&name)
1645            );
1646            let (item_count, size_bytes): (i64, i64) = self
1647                .conn
1648                .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1649            stats.push(TableStats {
1650                table_name: name,
1651                item_count,
1652                size_bytes: size_bytes as u64,
1653            });
1654        }
1655        Ok(stats)
1656    }
1657
1658    /// Get combined database info in a single call for atomic consistency.
1659    ///
1660    /// Returns all introspection data that `get_database_info` tools need
1661    /// without releasing the lock between queries.
1662    pub fn database_info(&self) -> Result<DatabaseInfo> {
1663        let path = self.db_path();
1664        let size_bytes = self.db_size_bytes()?;
1665        let table_count = self.table_count()?;
1666        let stats = self.table_stats()?;
1667
1668        let mut table_details = Vec::with_capacity(stats.len());
1669        for s in stats {
1670            let metadata = self.get_table_metadata(&s.table_name)?;
1671            table_details.push(TableInfoEntry { stats: s, metadata });
1672        }
1673
1674        Ok(DatabaseInfo {
1675            path,
1676            size_bytes,
1677            table_count,
1678            tables: table_details,
1679        })
1680    }
1681
1682    // -----------------------------------------------------------------------
1683    // Snapshot operations
1684    // -----------------------------------------------------------------------
1685
1686    /// Create a snapshot of the database by copying it to the given path.
1687    /// Uses `VACUUM INTO` which works for both in-memory and file-backed databases.
1688    pub fn vacuum_into(&self, path: &str) -> Result<()> {
1689        if path.contains('\0') {
1690            return Err(DynoxideError::ValidationException(
1691                "path contains null byte".to_string(),
1692            ));
1693        }
1694        self.conn
1695            .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1696        Ok(())
1697    }
1698
1699    /// Run VACUUM to compact the database file in place.
1700    pub fn vacuum(&self) -> Result<()> {
1701        self.conn.execute_batch("VACUUM")?;
1702        Ok(())
1703    }
1704
1705    /// Restore the database from a snapshot file using SQLite's backup API.
1706    /// This replaces the current database contents with the snapshot contents.
1707    /// Works for both in-memory and file-backed databases.
1708    pub fn restore_from(&mut self, path: &str) -> Result<()> {
1709        let source = Connection::open(path)?;
1710        self.restore_from_connection(&source)
1711    }
1712
1713    /// Backup the current database to a new in-memory SQLite connection.
1714    ///
1715    /// Used for in-memory snapshot storage — the returned connection holds
1716    /// a complete copy of the database without touching the filesystem.
1717    pub fn backup_to_memory(&self) -> Result<Connection> {
1718        let mut dest = Connection::open_in_memory()?;
1719        {
1720            let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1721            backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1722        }
1723        Ok(dest)
1724    }
1725
1726    /// Restore the database from another SQLite connection using the backup API.
1727    ///
1728    /// Replaces the current database contents with the source connection's
1729    /// contents. Invalidates the metadata cache.
1730    pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1731        let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1732        backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1733        self.metadata_cache.borrow_mut().clear();
1734        Ok(())
1735    }
1736
1737    /// Get the database size in bytes for an arbitrary connection.
1738    pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1739        let size: i64 = conn.query_row(
1740            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1741            [],
1742            |row| row.get(0),
1743        )?;
1744        Ok(size as u64)
1745    }
1746
1747    // -----------------------------------------------------------------------
1748    // Stream operations
1749    // -----------------------------------------------------------------------
1750
1751    /// Enable streams on a table.
1752    pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1753        self.conn.execute(
1754            "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1755            params![view_type, label, table_name],
1756        )?;
1757        self.metadata_cache.borrow_mut().remove(table_name);
1758        Ok(())
1759    }
1760
1761    /// Disable streams on a table.
1762    pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1763        self.conn.execute(
1764            "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1765            params![table_name],
1766        )?;
1767        self.metadata_cache.borrow_mut().remove(table_name);
1768        Ok(())
1769    }
1770
1771    /// Insert a stream record.
1772    #[allow(clippy::too_many_arguments)]
1773    pub fn insert_stream_record(
1774        &self,
1775        table_name: &str,
1776        event_name: &str,
1777        keys_json: &str,
1778        new_image: Option<&str>,
1779        old_image: Option<&str>,
1780        sequence_number: &str,
1781        shard_id: &str,
1782        created_at: i64,
1783    ) -> Result<()> {
1784        self.insert_stream_record_with_identity(
1785            table_name,
1786            event_name,
1787            keys_json,
1788            new_image,
1789            old_image,
1790            sequence_number,
1791            shard_id,
1792            created_at,
1793            None,
1794        )
1795    }
1796
1797    /// Insert a stream record with optional user identity (for TTL deletions).
1798    #[allow(clippy::too_many_arguments)]
1799    pub fn insert_stream_record_with_identity(
1800        &self,
1801        table_name: &str,
1802        event_name: &str,
1803        keys_json: &str,
1804        new_image: Option<&str>,
1805        old_image: Option<&str>,
1806        sequence_number: &str,
1807        shard_id: &str,
1808        created_at: i64,
1809        user_identity: Option<&str>,
1810    ) -> Result<()> {
1811        self.conn.execute(
1812            "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1813             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1814            params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1815        )?;
1816        Ok(())
1817    }
1818
1819    /// Get the next sequence number for a table's stream.
1820    pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1821        let result: std::result::Result<i64, _> = self.conn.query_row(
1822            "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1823            params![table_name],
1824            |row| row.get(0),
1825        );
1826        match result {
1827            Ok(n) => Ok(n),
1828            Err(_) => Ok(1),
1829        }
1830    }
1831
1832    /// Get stream records for a shard starting after a given sequence number.
1833    pub fn get_stream_records(
1834        &self,
1835        table_name: &str,
1836        shard_id: &str,
1837        after_sequence: i64,
1838        limit: usize,
1839    ) -> Result<Vec<StreamRecord>> {
1840        let mut stmt = self.conn.prepare(
1841            "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1842             FROM _stream_records
1843             WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1844             ORDER BY CAST(sequence_number AS INTEGER) ASC
1845             LIMIT ?4",
1846        )?;
1847        let rows = stmt
1848            .query_map(
1849                params![table_name, shard_id, after_sequence, limit as i64],
1850                |row| {
1851                    Ok(StreamRecord {
1852                        event_name: row.get(0)?,
1853                        keys_json: row.get(1)?,
1854                        new_image: row.get(2)?,
1855                        old_image: row.get(3)?,
1856                        sequence_number: row.get(4)?,
1857                        created_at: row.get(5)?,
1858                        user_identity: row.get(6)?,
1859                    })
1860                },
1861            )?
1862            .collect::<std::result::Result<Vec<_>, _>>()?;
1863        Ok(rows)
1864    }
1865
1866    /// List tables that have streams enabled.
1867    pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1868        let sql = format!(
1869            "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name"
1870        );
1871        let mut stmt = self.conn.prepare(&sql)?;
1872        let rows = stmt
1873            .query_map([], row_to_metadata)?
1874            .collect::<std::result::Result<Vec<_>, _>>()?;
1875        Ok(rows)
1876    }
1877
1878    // -----------------------------------------------------------------------
1879    // TTL operations
1880    // -----------------------------------------------------------------------
1881
1882    /// Update TTL configuration for a table.
1883    pub fn update_ttl_config(
1884        &self,
1885        table_name: &str,
1886        attribute_name: Option<&str>,
1887        enabled: bool,
1888    ) -> Result<()> {
1889        self.conn.execute(
1890            "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1891            params![attribute_name, enabled as i32, table_name],
1892        )?;
1893        self.metadata_cache.borrow_mut().remove(table_name);
1894        Ok(())
1895    }
1896
1897    /// List tables that have TTL enabled.
1898    pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1899        let sql = format!(
1900            "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name"
1901        );
1902        let mut stmt = self.conn.prepare(&sql)?;
1903        let rows = stmt
1904            .query_map([], row_to_metadata)?
1905            .collect::<std::result::Result<Vec<_>, _>>()?;
1906        Ok(rows)
1907    }
1908
1909    /// Get the min and max sequence numbers for a shard.
1910    pub fn get_shard_sequence_range(
1911        &self,
1912        table_name: &str,
1913        shard_id: &str,
1914    ) -> Result<(Option<String>, Option<String>)> {
1915        let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1916            "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1917            params![table_name, shard_id],
1918            |row| Ok((row.get(0)?, row.get(1)?)),
1919        );
1920        match result {
1921            Ok(range) => Ok(range),
1922            Err(_) => Ok((None, None)),
1923        }
1924    }
1925
1926    // -----------------------------------------------------------------------
1927    // Cache tracking (cached_at)
1928    // -----------------------------------------------------------------------
1929
1930    /// Update the `cached_at` timestamp for a single item.
1931    pub fn touch_cached_at(
1932        &self,
1933        table_name: &str,
1934        pk: &str,
1935        sk: &str,
1936        timestamp: f64,
1937    ) -> Result<()> {
1938        let sql = format!(
1939            "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1940            escape_table_name(table_name)
1941        );
1942        self.conn.execute(&sql, params![timestamp, pk, sk])?;
1943        Ok(())
1944    }
1945
1946    /// Get items ordered by `cached_at` (oldest first) for LRU eviction.
1947    ///
1948    /// Returns `(pk, sk, item_size)` tuples. Items with NULL `cached_at`
1949    /// are excluded (they were never cached from a remote source).
1950    pub fn get_lru_items(
1951        &self,
1952        table_name: &str,
1953        limit: usize,
1954    ) -> Result<Vec<(String, String, i64)>> {
1955        let sql = format!(
1956            "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1957            escape_table_name(table_name)
1958        );
1959        let mut stmt = self.conn.prepare(&sql)?;
1960        let rows = stmt
1961            .query_map(params![limit as i64], |row| {
1962                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1963            })?
1964            .collect::<std::result::Result<Vec<_>, _>>()?;
1965        Ok(rows)
1966    }
1967}
1968
1969/// A stream record from the `_stream_records` table.
1970#[derive(Debug, Clone)]
1971pub struct StreamRecord {
1972    pub event_name: String,
1973    pub keys_json: String,
1974    pub new_image: Option<String>,
1975    pub old_image: Option<String>,
1976    pub sequence_number: String,
1977    pub created_at: i64,
1978    pub user_identity: Option<String>,
1979}
1980
1981/// Per-table statistics returned by `Storage::table_stats()`.
1982#[derive(Debug, Clone)]
1983pub struct TableStats {
1984    pub table_name: String,
1985    pub item_count: i64,
1986    pub size_bytes: u64,
1987}
1988
1989/// Combined database introspection info returned by `Storage::database_info()`.
1990#[derive(Debug, Clone)]
1991pub struct DatabaseInfo {
1992    pub path: Option<String>,
1993    pub size_bytes: u64,
1994    pub table_count: usize,
1995    pub tables: Vec<TableInfoEntry>,
1996}
1997
1998/// Per-table stats + metadata for `DatabaseInfo`.
1999#[derive(Debug, Clone)]
2000pub struct TableInfoEntry {
2001    pub stats: TableStats,
2002    pub metadata: Option<TableMetadata>,
2003}
2004
2005/// Escape double quotes in table names for safe SQL identifier use.
2006pub(crate) fn escape_table_name(name: &str) -> String {
2007    name.replace('"', "\"\"")
2008}
2009
2010/// Metadata row from the `_tables` table.
2011///
2012/// Note: The `tags` column is intentionally excluded. Tags are not on the hot
2013/// path for item operations and are accessed via separate `get_tags`/`set_tags`
2014/// methods to keep the metadata cache lean.
2015#[derive(Debug, Clone)]
2016pub struct TableMetadata {
2017    pub table_name: String,
2018    pub key_schema: String,
2019    pub attribute_definitions: String,
2020    pub gsi_definitions: Option<String>,
2021    pub lsi_definitions: Option<String>,
2022    pub stream_enabled: bool,
2023    pub stream_view_type: Option<String>,
2024    pub stream_label: Option<String>,
2025    pub ttl_attribute: Option<String>,
2026    pub ttl_enabled: bool,
2027    pub created_at: i64,
2028    pub table_status: String,
2029    pub billing_mode: Option<String>,
2030    pub provisioned_throughput: Option<String>,
2031    pub sse_specification: Option<String>,
2032    pub table_class: Option<String>,
2033    pub deletion_protection_enabled: bool,
2034}
2035
2036/// The standard SELECT column list for _tables queries.
2037const TABLE_METADATA_COLUMNS: &str = "table_name, key_schema, attribute_definitions, gsi_definitions, \
2038     lsi_definitions, stream_enabled, stream_view_type, stream_label, ttl_attribute, ttl_enabled, \
2039     created_at, table_status, billing_mode, provisioned_throughput, \
2040     sse_specification, table_class, deletion_protection_enabled";
2041
2042/// Map a row from the _tables SELECT to a TableMetadata struct.
2043fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
2044    Ok(TableMetadata {
2045        table_name: row.get(0)?,
2046        key_schema: row.get(1)?,
2047        attribute_definitions: row.get(2)?,
2048        gsi_definitions: row.get(3)?,
2049        lsi_definitions: row.get(4)?,
2050        stream_enabled: row.get::<_, i32>(5)? != 0,
2051        stream_view_type: row.get(6)?,
2052        stream_label: row.get(7)?,
2053        ttl_attribute: row.get(8)?,
2054        ttl_enabled: row.get::<_, i32>(9)? != 0,
2055        created_at: row.get(10)?,
2056        table_status: row.get(11)?,
2057        billing_mode: row.get(12)?,
2058        provisioned_throughput: row.get(13)?,
2059        sse_specification: row.get(14)?,
2060        table_class: row.get(15)?,
2061        deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
2062    })
2063}
2064
2065#[cfg(test)]
2066mod tests {
2067    use super::*;
2068
2069    fn test_storage() -> Storage {
2070        Storage::memory().expect("Failed to create in-memory storage")
2071    }
2072
2073    #[test]
2074    fn test_initialize_creates_metadata_tables() {
2075        let storage = test_storage();
2076        // _config and _tables should exist
2077        let version: String = storage
2078            .conn()
2079            .query_row(
2080                "SELECT value FROM _config WHERE key = 'schema_version'",
2081                [],
2082                |row| row.get(0),
2083            )
2084            .unwrap();
2085        assert_eq!(version, SCHEMA_VERSION);
2086    }
2087
2088    #[test]
2089    fn test_wal_mode_enabled() {
2090        let storage = test_storage();
2091        let mode: String = storage
2092            .conn()
2093            .query_row("PRAGMA journal_mode", [], |row| row.get(0))
2094            .unwrap();
2095        // In-memory databases may report "memory" instead of "wal"
2096        assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
2097    }
2098
2099    #[test]
2100    fn test_table_metadata_crud() {
2101        let storage = test_storage();
2102
2103        // Initially no tables
2104        assert!(!storage.table_exists("TestTable").unwrap());
2105        assert!(storage.list_table_names().unwrap().is_empty());
2106
2107        // Insert metadata
2108        storage
2109            .insert_table_metadata(&CreateTableMetadata {
2110                table_name: "TestTable",
2111                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2112                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2113                created_at: 1000000,
2114                ..Default::default()
2115            })
2116            .unwrap();
2117
2118        assert!(storage.table_exists("TestTable").unwrap());
2119        assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2120
2121        // Get metadata
2122        let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2123        assert_eq!(meta.table_name, "TestTable");
2124        assert_eq!(meta.table_status, "ACTIVE");
2125        assert_eq!(meta.created_at, 1000000);
2126
2127        // Delete metadata
2128        assert!(storage.delete_table_metadata("TestTable").unwrap());
2129        assert!(!storage.table_exists("TestTable").unwrap());
2130    }
2131
2132    #[test]
2133    fn test_create_and_drop_data_table() {
2134        let storage = test_storage();
2135        storage.create_data_table("MyTable").unwrap();
2136
2137        // Should be able to insert into it
2138        storage
2139            .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2140            .unwrap();
2141
2142        let item = storage.get_item("MyTable", "pk1", "").unwrap();
2143        assert!(item.is_some());
2144
2145        storage.drop_data_table("MyTable").unwrap();
2146    }
2147
2148    #[test]
2149    fn test_item_crud() {
2150        let storage = test_storage();
2151        storage.create_data_table("Items").unwrap();
2152
2153        // Put item
2154        let old = storage
2155            .put_item(
2156                "Items",
2157                "user#1",
2158                "profile",
2159                r#"{"name":{"S":"Alice"}}"#,
2160                20,
2161            )
2162            .unwrap();
2163        assert!(old.is_none()); // No previous item
2164
2165        // Get item
2166        let item = storage.get_item("Items", "user#1", "profile").unwrap();
2167        assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2168
2169        // Replace item (returns old)
2170        let old = storage
2171            .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2172            .unwrap();
2173        assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2174
2175        // Delete item
2176        let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2177        assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2178
2179        // Item should be gone
2180        assert!(
2181            storage
2182                .get_item("Items", "user#1", "profile")
2183                .unwrap()
2184                .is_none()
2185        );
2186    }
2187
2188    #[test]
2189    fn test_query_items() {
2190        let storage = test_storage();
2191        storage.create_data_table("Orders").unwrap();
2192
2193        // Insert several items for the same partition key
2194        for i in 1..=5 {
2195            let sk = format!("order#{i:03}");
2196            let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2197            storage
2198                .put_item("Orders", "user#1", &sk, &json, 10)
2199                .unwrap();
2200        }
2201
2202        // Query all for partition key
2203        let results = storage
2204            .query_items(
2205                "Orders",
2206                "user#1",
2207                &QueryParams {
2208                    forward: true,
2209                    ..Default::default()
2210                },
2211            )
2212            .unwrap();
2213        assert_eq!(results.len(), 5);
2214        assert_eq!(results[0].1, "order#001"); // Sorted ascending
2215
2216        // Query with limit
2217        let results = storage
2218            .query_items(
2219                "Orders",
2220                "user#1",
2221                &QueryParams {
2222                    forward: true,
2223                    limit: Some(2),
2224                    ..Default::default()
2225                },
2226            )
2227            .unwrap();
2228        assert_eq!(results.len(), 2);
2229
2230        // Query reverse
2231        let results = storage
2232            .query_items(
2233                "Orders",
2234                "user#1",
2235                &QueryParams {
2236                    forward: false,
2237                    limit: Some(2),
2238                    ..Default::default()
2239                },
2240            )
2241            .unwrap();
2242        assert_eq!(results.len(), 2);
2243        assert_eq!(results[0].1, "order#005"); // Sorted descending
2244    }
2245
2246    #[test]
2247    fn test_scan_items() {
2248        let storage = test_storage();
2249        storage.create_data_table("ScanTest").unwrap();
2250
2251        storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2252        storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2253        storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2254
2255        let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2256        assert_eq!(results.len(), 3);
2257
2258        // Scan with limit
2259        let results = storage
2260            .scan_items(
2261                "ScanTest",
2262                &ScanParams {
2263                    limit: Some(2),
2264                    ..Default::default()
2265                },
2266            )
2267            .unwrap();
2268        assert_eq!(results.len(), 2);
2269
2270        // Scan with pagination
2271        let results = storage
2272            .scan_items(
2273                "ScanTest",
2274                &ScanParams {
2275                    limit: Some(2),
2276                    exclusive_start_pk: Some("a"),
2277                    exclusive_start_sk: Some("1"),
2278                    ..Default::default()
2279                },
2280            )
2281            .unwrap();
2282        assert_eq!(results.len(), 2);
2283        assert_eq!(results[0].0, "b"); // Skipped "a"
2284    }
2285
2286    #[test]
2287    fn test_count_items() {
2288        let storage = test_storage();
2289        storage.create_data_table("CountTest").unwrap();
2290
2291        assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2292
2293        storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2294        storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2295
2296        assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2297    }
2298
2299    #[test]
2300    fn test_gsi_table_lifecycle() {
2301        let storage = test_storage();
2302        storage.create_gsi_table("Orders", "ByDate").unwrap();
2303
2304        // Should be able to write to the GSI table via raw SQL
2305        let gsi_name = "Orders::gsi::ByDate";
2306        let sql = format!(
2307            "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2308            gsi_name.replace('"', "\"\"")
2309        );
2310        storage
2311            .conn()
2312            .execute(
2313                &sql,
2314                params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2315            )
2316            .unwrap();
2317
2318        storage.drop_gsi_table("Orders", "ByDate").unwrap();
2319    }
2320
2321    #[test]
2322    fn test_nonexistent_table_metadata() {
2323        let storage = test_storage();
2324        assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2325        assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2326    }
2327
2328    #[test]
2329    fn test_metadata_cache_hit() {
2330        let storage = test_storage();
2331        storage
2332            .insert_table_metadata(&CreateTableMetadata {
2333                table_name: "CacheTest",
2334                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2335                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2336                created_at: 1000000,
2337                ..Default::default()
2338            })
2339            .unwrap();
2340
2341        // First call populates cache
2342        let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2343        assert_eq!(meta1.table_name, "CacheTest");
2344
2345        // Second call should hit cache (same result)
2346        let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2347        assert_eq!(meta2.table_name, "CacheTest");
2348        assert_eq!(meta1.created_at, meta2.created_at);
2349
2350        // Cache should have the entry
2351        assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2352    }
2353
2354    #[test]
2355    fn test_metadata_cache_invalidated_on_delete() {
2356        let storage = test_storage();
2357        storage
2358            .insert_table_metadata(&CreateTableMetadata {
2359                table_name: "DelCache",
2360                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2361                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2362                created_at: 1000000,
2363                ..Default::default()
2364            })
2365            .unwrap();
2366
2367        // Populate cache
2368        storage.get_table_metadata("DelCache").unwrap();
2369        assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2370
2371        // Delete should invalidate cache
2372        storage.delete_table_metadata("DelCache").unwrap();
2373        assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2374    }
2375
2376    #[test]
2377    fn test_metadata_cache_invalidated_on_stream_enable() {
2378        let storage = test_storage();
2379        storage
2380            .insert_table_metadata(&CreateTableMetadata {
2381                table_name: "StreamCache",
2382                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2383                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2384                created_at: 1000000,
2385                ..Default::default()
2386            })
2387            .unwrap();
2388
2389        // Populate cache
2390        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2391        assert!(!meta.stream_enabled);
2392
2393        // Enable stream should invalidate cache
2394        storage
2395            .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2396            .unwrap();
2397        assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2398
2399        // Next get should reflect the change
2400        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2401        assert!(meta.stream_enabled);
2402    }
2403
2404    #[test]
2405    fn test_metadata_cache_invalidated_on_ttl_update() {
2406        let storage = test_storage();
2407        storage
2408            .insert_table_metadata(&CreateTableMetadata {
2409                table_name: "TtlCache",
2410                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2411                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2412                created_at: 1000000,
2413                ..Default::default()
2414            })
2415            .unwrap();
2416
2417        // Populate cache
2418        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2419        assert!(!meta.ttl_enabled);
2420
2421        // Update TTL should invalidate cache
2422        storage
2423            .update_ttl_config("TtlCache", Some("expires_at"), true)
2424            .unwrap();
2425        assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2426
2427        // Next get should reflect the change
2428        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2429        assert!(meta.ttl_enabled);
2430        assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2431    }
2432
2433    #[test]
2434    fn test_num_to_buffer_zero() {
2435        // numToBuffer("0") → [0x80]
2436        assert_eq!(num_to_buffer("0"), vec![0x80]);
2437        assert_eq!(num_to_buffer("-0"), vec![0x80]);
2438    }
2439
2440    #[test]
2441    fn test_hash_prefix_string_keys() {
2442        // Verify known hash prefixes for string keys used in scan tests.
2443        // These specific values determine which segment items land in.
2444        let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2445        let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2446        let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2447        let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2448
2449        // With TotalSegments=4096, segment 0 owns bucket 0 only.
2450        // Items "3635" and "228" must be in segment 0 (bucket 0).
2451        assert_eq!(
2452            hash_bucket(&h1),
2453            0,
2454            "3635 should be bucket 0, got hash {h1}"
2455        );
2456        assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2457
2458        // "1668" must be in segment 1 (bucket 1)
2459        assert_eq!(
2460            hash_bucket(&h3),
2461            1,
2462            "1668 should be bucket 1, got hash {h3}"
2463        );
2464
2465        // "3435" must be in segment 4 (bucket 4)
2466        assert_eq!(
2467            hash_bucket(&h4),
2468            4,
2469            "3435 should be bucket 4, got hash {h4}"
2470        );
2471    }
2472
2473    #[test]
2474    fn test_hash_prefix_number_keys() {
2475        // Verify number key hash prefixes from scan tests.
2476        // "251" must be in segment 1 (bucket 1) with TotalSegments=4096
2477        let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2478        assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2479
2480        // "2388" must be in segment 4095 (bucket 4095)
2481        let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2482        assert_eq!(
2483            hash_bucket(&h2),
2484            4095,
2485            "2388 should be bucket 4095, got hash {h2}"
2486        );
2487    }
2488
2489    #[test]
2490    fn test_hash_in_segment() {
2491        // bucket 0 should be in segment 0 of 4096
2492        assert!(hash_in_segment("000000", 0, 4096));
2493        assert!(!hash_in_segment("000000", 1, 4096));
2494
2495        // bucket 1 should be in segment 1 of 4096
2496        assert!(hash_in_segment("001000", 1, 4096));
2497        assert!(!hash_in_segment("001000", 0, 4096));
2498
2499        // bucket 4095 should be in segment 4095 of 4096
2500        assert!(hash_in_segment("fff000", 4095, 4096));
2501        assert!(!hash_in_segment("fff000", 0, 4096));
2502
2503        // With 2 segments: buckets 0-2047 in segment 0, 2048-4095 in segment 1
2504        assert!(hash_in_segment("000000", 0, 2));
2505        assert!(hash_in_segment("7ff000", 0, 2));
2506        assert!(hash_in_segment("800000", 1, 2));
2507        assert!(hash_in_segment("fff000", 1, 2));
2508    }
2509}