Skip to main content

dynoxide/
storage.rs

1use crate::errors::{DynoxideError, Result};
2use crate::types::AttributeValue;
3use rusqlite::{Connection, params};
4use std::cell::RefCell;
5use std::collections::HashMap;
6
7/// Current schema version. Stored in the `_config` table for future migrations.
8const SCHEMA_VERSION: &str = "6";
9
10/// Number of hash buckets used for parallel scan segment assignment.
11/// Matches dynalite's implementation.
12const HASH_BUCKETS: u32 = 4096;
13
14// ---------------------------------------------------------------------------
15// MD5-based hash prefix for parallel scan (dynalite-compatible)
16// ---------------------------------------------------------------------------
17
18/// Compute the 6-character hex hash prefix for a partition key value.
19///
20/// Uses `MD5("Outliers" + key_bytes)` where `key_bytes` depends on the
21/// attribute type:
22/// - `S`: UTF-8 bytes of the string
23/// - `N`: Oracle packed BCD encoding via [`num_to_buffer`]
24/// - `B`: raw binary bytes
25///
26/// This matches dynalite's `hashPrefix` function.
27pub fn compute_hash_prefix(pk_value: &AttributeValue) -> String {
28    let key_bytes = match pk_value {
29        AttributeValue::S(s) => s.as_bytes().to_vec(),
30        AttributeValue::N(n) => num_to_buffer(n),
31        AttributeValue::B(b) => b.clone(),
32        _ => vec![], // Should not happen for valid keys
33    };
34
35    let digest = md5::compute([b"Outliers" as &[u8], &key_bytes].concat());
36    format!("{:032x}", digest)[..6].to_string()
37}
38
39/// Compute the hash bucket (0..4095) from a 6-character hex hash prefix.
40pub fn hash_bucket(hash_prefix: &str) -> u32 {
41    let prefix_3 = &hash_prefix[..3.min(hash_prefix.len())];
42    u32::from_str_radix(prefix_3, 16).unwrap_or(0)
43}
44
45/// Convert a DynamoDB number string to Oracle-style packed BCD bytes.
46///
47/// Faithfully ports dynalite's `numToBuffer` function from `db/index.js`.
48/// Uses Big.js semantics: `num.s` (sign), `num.c` (coefficient digits),
49/// `num.e` (exponent = position of first digit relative to decimal point - 1).
50fn num_to_buffer(num_str: &str) -> Vec<u8> {
51    let trimmed = num_str.trim();
52    if trimmed.is_empty() {
53        return vec![0x80];
54    }
55
56    use bigdecimal::BigDecimal;
57    use std::str::FromStr;
58
59    let bd = match BigDecimal::from_str(trimmed) {
60        Ok(v) => v,
61        Err(_) => return vec![0x80],
62    };
63
64    if bd.sign() == bigdecimal::num_bigint::Sign::NoSign {
65        return vec![0x80];
66    }
67
68    let is_negative = bd.sign() == bigdecimal::num_bigint::Sign::Minus;
69    let bd_abs = if is_negative { -&bd } else { bd.clone() };
70
71    let (mantissa, exponent) = extract_mantissa_and_exponent(&bd_abs);
72    if mantissa.is_empty() {
73        return vec![0x80];
74    }
75
76    // JS: scale = num.s (-1 for negative, 1 for positive)
77    // JS: appendZero = exponent % 2 ? 1 : 0
78    let append_zero: i64 = if exponent % 2 != 0 { 1 } else { 0 };
79    let byte_len_no_exp = ((mantissa.len() as i64 + append_zero + 1) / 2) as usize;
80
81    let mut byte_array: Vec<u8>;
82    if byte_len_no_exp < 20 && is_negative {
83        byte_array = vec![0u8; byte_len_no_exp + 2];
84        byte_array[byte_len_no_exp + 1] = 102;
85    } else {
86        byte_array = vec![0u8; byte_len_no_exp + 1];
87    }
88
89    // byteArray[0] = Math.floor((exponent + appendZero) / 2) - 64
90    // For negative exponents, JS Math.floor(-3/2) = -2, matching Rust integer division
91    // for negative values we need floor division, not truncation.
92    let exp_sum = exponent + append_zero;
93    let exp_byte_val = floor_div(exp_sum, 2) - 64;
94    if is_negative {
95        // byteArray[0] ^= 0xffffffff — JS bitwise XOR on a number
96        // This effectively does a bitwise NOT on the low byte
97        byte_array[0] = (exp_byte_val ^ !0i64) as u8;
98    } else {
99        byte_array[0] = exp_byte_val as u8;
100    }
101
102    // The main loop faithfully mirrors the JS for loop.
103    // JS uses mantissaIndex as a signed int that can be decremented to -1.
104    let mut mi: i64 = 0; // mantissaIndex (signed to allow -1)
105    let mlen = mantissa.len() as i64;
106    let mut appended_zero = false;
107
108    while mi < mlen {
109        let bai = ((mi + append_zero) / 2 + 1) as usize; // byteArrayIndex
110        if append_zero != 0 && mi == 0 && !appended_zero {
111            byte_array[bai] = 0;
112            appended_zero = true;
113            mi -= 1; // JS: mantissaIndex--
114        } else if (mi + append_zero) % 2 == 0 {
115            byte_array[bai] = mantissa[mi as usize] * 10;
116        } else {
117            byte_array[bai] += mantissa[mi as usize];
118        }
119
120        // Finalise byte: if odd position or last mantissa digit
121        if ((mi + append_zero) % 2 != 0) || (mi == mlen - 1) {
122            if is_negative {
123                byte_array[bai] = 101u8.wrapping_sub(byte_array[bai]);
124            } else {
125                byte_array[bai] = byte_array[bai].wrapping_add(1);
126            }
127        }
128
129        mi += 1; // JS: for loop increment
130    }
131
132    byte_array
133}
134
135/// Floor division for signed integers (matching JS Math.floor for division).
136fn floor_div(a: i64, b: i64) -> i64 {
137    let d = a / b;
138    let r = a % b;
139    if (r != 0) && ((r ^ b) < 0) { d - 1 } else { d }
140}
141
142/// Extract mantissa digits and exponent from a BigDecimal.
143///
144/// Returns (digits, exponent) matching Big.js semantics where:
145/// - digits: array of individual digits [2, 5, 1] for "251"
146/// - exponent: number of digits before the decimal point
147///   e.g., "251" → exponent=3, "0.012345" → exponent=-1
148fn extract_mantissa_and_exponent(bd: &bigdecimal::BigDecimal) -> (Vec<u8>, i64) {
149    // Normalize to remove trailing zeros
150    let normalized = bd.normalized();
151
152    // Get the string representation without scientific notation
153    // We need the digits and the position of the decimal point
154    let (bigint, scale) = normalized.as_bigint_and_exponent();
155    let digits_str = bigint.to_string();
156    let digits_str = digits_str.trim_start_matches('-');
157
158    let digits: Vec<u8> = digits_str
159        .chars()
160        .map(|c| c.to_digit(10).unwrap() as u8)
161        .collect();
162
163    // scale = number of digits after decimal point in the representation
164    // exponent (Big.js style) = number_of_digits - scale = digits.len() as i64 - scale
165    let exponent = digits.len() as i64 - scale;
166
167    (digits, exponent)
168}
169
170/// Check whether a hash_prefix falls within the range for a given segment.
171///
172/// Uses dynalite's 4096-bucket scheme:
173/// - bucket = parseInt(hash_prefix[0..3], 16)
174/// - segment owns buckets from ceil(4096 * segment / total) to
175///   ceil(4096 * (segment+1) / total) - 1
176pub fn hash_in_segment(hash_prefix: &str, segment: u32, total_segments: u32) -> bool {
177    let bucket = hash_bucket(hash_prefix);
178    let start = ceiling_div(HASH_BUCKETS * segment, total_segments);
179    let end = ceiling_div(HASH_BUCKETS * (segment + 1), total_segments) - 1;
180    bucket >= start && bucket <= end
181}
182
183fn ceiling_div(a: u32, b: u32) -> u32 {
184    a.div_ceil(b)
185}
186
187/// Parameters for scan operations (base table or GSI).
188#[derive(Debug, Default)]
189pub struct ScanParams<'a> {
190    pub limit: Option<usize>,
191    pub exclusive_start_pk: Option<&'a str>,
192    pub exclusive_start_sk: Option<&'a str>,
193    pub segment: Option<u32>,
194    pub total_segments: Option<u32>,
195    /// For LSI pagination: base table PK for composite cursor.
196    pub exclusive_start_base_pk: Option<&'a str>,
197    /// For LSI pagination: base table SK for composite cursor.
198    pub exclusive_start_base_sk: Option<&'a str>,
199}
200
201/// Parameters for inserting table metadata.
202#[derive(Debug, Default)]
203pub struct CreateTableMetadata<'a> {
204    pub table_name: &'a str,
205    pub key_schema: &'a str,
206    pub attribute_definitions: &'a str,
207    pub gsi_definitions: Option<&'a str>,
208    pub lsi_definitions: Option<&'a str>,
209    pub provisioned_throughput: Option<&'a str>,
210    pub created_at: i64,
211    pub sse_specification: Option<&'a str>,
212    pub table_class: Option<&'a str>,
213    pub deletion_protection_enabled: bool,
214    pub billing_mode: Option<&'a str>,
215}
216
217/// Parameters for query operations (base table or GSI).
218#[derive(Debug, Default)]
219pub struct QueryParams<'a> {
220    pub sk_condition: Option<&'a str>,
221    pub sk_params: &'a [&'a str],
222    pub forward: bool,
223    pub limit: Option<usize>,
224    pub exclusive_start_sk: Option<&'a str>,
225    /// For LSI pagination: base table PK for composite cursor.
226    pub exclusive_start_base_pk: Option<&'a str>,
227    /// For LSI pagination: base table SK for composite cursor.
228    pub exclusive_start_base_sk: Option<&'a str>,
229}
230
231/// Low-level SQLite storage layer.
232///
233/// Manages the SQLite connection, metadata tables, and per-DynamoDB-table
234/// data tables. All SQL lives here — higher layers work with Rust types.
235pub struct Storage {
236    conn: Connection,
237    /// In-memory cache of table metadata to avoid repeated SQLite reads.
238    /// Safe to use `RefCell` because `Storage` is always behind `Arc<Mutex<>>`.
239    metadata_cache: RefCell<HashMap<String, TableMetadata>>,
240}
241
242impl Storage {
243    /// Open a persistent database at the given path.
244    pub fn new(path: &str) -> Result<Self> {
245        let conn = Connection::open(path)?;
246        let mut storage = Self {
247            conn,
248            metadata_cache: RefCell::new(HashMap::new()),
249        };
250        storage.initialize().map_err(Self::maybe_encrypted_error)?;
251        Ok(storage)
252    }
253
254    /// If a SQLite error is SQLITE_NOTADB, return a clearer error message
255    /// suggesting the database may be encrypted.
256    fn maybe_encrypted_error(err: DynoxideError) -> DynoxideError {
257        if let DynoxideError::SqliteError(ref sqlite_err) = err {
258            if let Some(rusqlite::ErrorCode::NotADatabase) = sqlite_err.sqlite_error_code() {
259                return DynoxideError::InternalServerError(
260                    "Database file is encrypted or not a valid SQLite database. \
261                     If encrypted, enable the `encryption` or `encryption-cc` feature \
262                     and use Database::new_encrypted() with the correct key."
263                        .to_string(),
264                );
265            }
266        }
267        err
268    }
269
270    /// Open or create an encrypted persistent database at the given path.
271    ///
272    /// The key is passed to SQLCipher via `PRAGMA key`. The database file is
273    /// encrypted at rest using AES-256-CBC. A database opened without calling
274    /// `PRAGMA key` is treated as a normal unencrypted database by SQLCipher.
275    #[cfg(feature = "_has-encryption")]
276    pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
277        use zeroize::Zeroize;
278
279        let conn = Connection::open(path)?;
280        // Safety: key is validated to be exactly 64 hex characters [0-9a-fA-F]
281        // by Database::new_encrypted(), so no injection is possible in the
282        // x'...' hex literal format. Note: pragma_update does NOT use parameter
283        // binding for the PRAGMA value — hex validation is the sole injection defense.
284        let mut pragma_val = format!("x'{key}'");
285        conn.pragma_update(None, "key", &pragma_val)?;
286        pragma_val.zeroize();
287        // Verify the key works by reading from the database
288        conn.execute_batch("SELECT count(*) FROM sqlite_master;")?;
289        let mut storage = Self {
290            conn,
291            metadata_cache: RefCell::new(HashMap::new()),
292        };
293        storage.initialize()?;
294        Ok(storage)
295    }
296
297    /// Open an in-memory database (for tests and ephemeral use).
298    pub fn memory() -> Result<Self> {
299        let conn = Connection::open_in_memory()?;
300        let mut storage = Self {
301            conn,
302            metadata_cache: RefCell::new(HashMap::new()),
303        };
304        storage.initialize()?;
305        Ok(storage)
306    }
307
308    /// Initialize the database: WAL mode, metadata tables, config.
309    fn initialize(&mut self) -> Result<()> {
310        // Enable WAL mode for better concurrency
311        self.conn.pragma_update(None, "journal_mode", "WAL")?;
312
313        // Register FNV-1a hash function for parallel scan segment assignment.
314        // Uses get_raw() to borrow directly from SQLite's buffer (zero-copy).
315        self.conn.create_scalar_function(
316            "fnv1a_hash",
317            1,
318            rusqlite::functions::FunctionFlags::SQLITE_DETERMINISTIC
319                | rusqlite::functions::FunctionFlags::SQLITE_UTF8,
320            |ctx: &rusqlite::functions::Context| -> rusqlite::Result<i64> {
321                let pk_ref = ctx.get_raw(0);
322                let pk_bytes = match pk_ref {
323                    rusqlite::types::ValueRef::Text(bytes) => bytes,
324                    _ => {
325                        return Err(rusqlite::Error::InvalidFunctionParameterType(
326                            0,
327                            rusqlite::types::Type::Text,
328                        ));
329                    }
330                };
331                let mut hash: u32 = 2166136261;
332                for &byte in pk_bytes {
333                    hash ^= byte as u32;
334                    hash = hash.wrapping_mul(16777619);
335                }
336                Ok(hash as i64)
337            },
338        )?;
339
340        // Create metadata tables
341        self.conn.execute_batch(
342            "CREATE TABLE IF NOT EXISTS _config (
343                key TEXT PRIMARY KEY,
344                value TEXT NOT NULL
345            );
346
347            CREATE TABLE IF NOT EXISTS _tables (
348                table_name TEXT PRIMARY KEY,
349                key_schema TEXT NOT NULL,
350                attribute_definitions TEXT NOT NULL,
351                gsi_definitions TEXT,
352                lsi_definitions TEXT,
353                stream_enabled INTEGER DEFAULT 0,
354                stream_view_type TEXT,
355                stream_label TEXT,
356                ttl_attribute TEXT,
357                ttl_enabled INTEGER DEFAULT 0,
358                created_at INTEGER NOT NULL,
359                table_status TEXT NOT NULL DEFAULT 'ACTIVE',
360                billing_mode TEXT DEFAULT 'PAY_PER_REQUEST',
361                provisioned_throughput TEXT,
362                tags TEXT,
363                sse_specification TEXT,
364                table_class TEXT,
365                deletion_protection_enabled INTEGER DEFAULT 0
366            );
367
368            CREATE TABLE IF NOT EXISTS _stream_records (
369                id INTEGER PRIMARY KEY AUTOINCREMENT,
370                table_name TEXT NOT NULL,
371                event_name TEXT NOT NULL,
372                keys_json TEXT NOT NULL,
373                new_image TEXT,
374                old_image TEXT,
375                sequence_number TEXT NOT NULL,
376                shard_id TEXT NOT NULL,
377                created_at INTEGER NOT NULL,
378                user_identity TEXT
379            );",
380        )?;
381
382        // Migrate: add user_identity column if it doesn't exist (for databases created before Phase 11)
383        let _ = self
384            .conn
385            .execute_batch("ALTER TABLE _stream_records ADD COLUMN user_identity TEXT");
386
387        // Set schema version if not present
388        self.conn.execute(
389            "INSERT OR IGNORE INTO _config (key, value) VALUES ('schema_version', ?1)",
390            params![SCHEMA_VERSION],
391        )?;
392
393        // Run schema migrations
394        let version: i32 = self
395            .conn
396            .query_row(
397                "SELECT value FROM _config WHERE key = 'schema_version'",
398                [],
399                |r| r.get::<_, String>(0),
400            )
401            .unwrap_or_else(|_| "1".to_string())
402            .parse()
403            .unwrap_or(1);
404
405        if version < 2 {
406            self.migrate_v1_to_v2()?;
407        }
408        if version < 3 {
409            self.migrate_v2_to_v3()?;
410        }
411        if version < 4 {
412            self.migrate_v3_to_v4()?;
413        }
414        if version < 5 {
415            self.migrate_v4_to_v5()?;
416        }
417        if version < 6 {
418            self.migrate_v5_to_v6()?;
419        }
420
421        Ok(())
422    }
423
424    /// Migrate from schema v1 to v2: add `cached_at REAL` column to all data tables.
425    fn migrate_v1_to_v2(&self) -> Result<()> {
426        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
427        let table_names: Vec<String> = stmt
428            .query_map([], |row| row.get(0))?
429            .collect::<std::result::Result<Vec<_>, _>>()?;
430
431        for table_name in &table_names {
432            let escaped = format!("\"{}\"", table_name.replace('"', "\"\""));
433            let _ = self.conn.execute(
434                &format!("ALTER TABLE {escaped} ADD COLUMN cached_at REAL"),
435                [],
436            );
437        }
438
439        self.conn.execute(
440            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '2')",
441            [],
442        )?;
443
444        Ok(())
445    }
446
447    /// Migrate from schema v2 to v3: add `tags TEXT` column to `_tables`.
448    fn migrate_v2_to_v3(&self) -> Result<()> {
449        let _ = self
450            .conn
451            .execute("ALTER TABLE _tables ADD COLUMN tags TEXT", []);
452
453        self.conn.execute(
454            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '3')",
455            [],
456        )?;
457
458        Ok(())
459    }
460
461    /// Migrate from schema v3 to v4: add SSE, table class, and deletion protection columns.
462    fn migrate_v3_to_v4(&self) -> Result<()> {
463        let _ = self
464            .conn
465            .execute("ALTER TABLE _tables ADD COLUMN sse_specification TEXT", []);
466        let _ = self
467            .conn
468            .execute("ALTER TABLE _tables ADD COLUMN table_class TEXT", []);
469        let _ = self.conn.execute(
470            "ALTER TABLE _tables ADD COLUMN deletion_protection_enabled INTEGER DEFAULT 0",
471            [],
472        );
473
474        self.conn.execute(
475            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '4')",
476            [],
477        )?;
478
479        Ok(())
480    }
481
482    /// Migrate from schema v4 to v5: add secondary indexes on GSI and LSI base-key columns.
483    fn migrate_v4_to_v5(&self) -> Result<()> {
484        let mut stmt = self
485            .conn
486            .prepare("SELECT table_name, gsi_definitions, lsi_definitions FROM _tables")?;
487        let tables: Vec<(String, Option<String>, Option<String>)> = stmt
488            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
489            .collect::<std::result::Result<Vec<_>, _>>()?;
490
491        for (table_name, gsi_json, lsi_json) in &tables {
492            if let Some(json) = gsi_json {
493                if let Ok(gsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
494                    for gsi in &gsis {
495                        if let Some(idx) = gsi.get("IndexName").and_then(|v| v.as_str()) {
496                            let gsi_table = escape_table_name(&format!("{table_name}::gsi::{idx}"));
497                            let idx_name =
498                                escape_table_name(&format!("{table_name}::gsi::{idx}::base_key"));
499                            let _ = self.conn.execute_batch(&format!(
500                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{gsi_table}\" (table_pk, table_sk)"
501                            ));
502                        }
503                    }
504                }
505            }
506            if let Some(json) = lsi_json {
507                if let Ok(lsis) = serde_json::from_str::<Vec<serde_json::Value>>(json) {
508                    for lsi in &lsis {
509                        if let Some(idx) = lsi.get("IndexName").and_then(|v| v.as_str()) {
510                            let lsi_table = escape_table_name(&format!("{table_name}::lsi::{idx}"));
511                            let idx_name =
512                                escape_table_name(&format!("{table_name}::lsi::{idx}::base_key"));
513                            let _ = self.conn.execute_batch(&format!(
514                                "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{lsi_table}\" (base_pk, base_sk)"
515                            ));
516                        }
517                    }
518                }
519            }
520        }
521
522        self.conn.execute(
523            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '5')",
524            [],
525        )?;
526
527        Ok(())
528    }
529
530    /// Migrate from schema v5 to v6: add `hash_prefix TEXT` column to all data tables.
531    fn migrate_v5_to_v6(&self) -> Result<()> {
532        let mut stmt = self.conn.prepare("SELECT table_name FROM _tables")?;
533        let table_names: Vec<String> = stmt
534            .query_map([], |row| row.get(0))?
535            .collect::<std::result::Result<Vec<_>, _>>()?;
536
537        for table_name in &table_names {
538            let escaped = escape_table_name(table_name);
539            let _ = self.conn.execute(
540                &format!(
541                    "ALTER TABLE \"{escaped}\" ADD COLUMN hash_prefix TEXT NOT NULL DEFAULT ''"
542                ),
543                [],
544            );
545        }
546
547        self.conn.execute(
548            "INSERT OR REPLACE INTO _config (key, value) VALUES ('schema_version', '6')",
549            [],
550        )?;
551
552        Ok(())
553    }
554
555    /// Get a reference to the underlying connection (for transactions, etc.).
556    pub fn conn(&self) -> &Connection {
557        &self.conn
558    }
559
560    /// Get a mutable reference to the underlying connection.
561    pub fn conn_mut(&mut self) -> &mut Connection {
562        &mut self.conn
563    }
564
565    // -----------------------------------------------------------------------
566    // Table metadata
567    // -----------------------------------------------------------------------
568
569    /// Insert a row into the `_tables` metadata table.
570    pub fn insert_table_metadata(&self, m: &CreateTableMetadata) -> Result<()> {
571        let table_name = m.table_name;
572        self.conn.execute(
573            "INSERT INTO _tables (table_name, key_schema, attribute_definitions, gsi_definitions, \
574             lsi_definitions, provisioned_throughput, created_at, sse_specification, table_class, \
575             deletion_protection_enabled, billing_mode)
576             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
577            params![
578                m.table_name,
579                m.key_schema,
580                m.attribute_definitions,
581                m.gsi_definitions,
582                m.lsi_definitions,
583                m.provisioned_throughput,
584                m.created_at,
585                m.sse_specification,
586                m.table_class,
587                m.deletion_protection_enabled as i32,
588                m.billing_mode,
589            ],
590        )?;
591        self.metadata_cache.borrow_mut().remove(table_name);
592        Ok(())
593    }
594
595    /// Get metadata for a table. Returns None if the table doesn't exist.
596    ///
597    /// Results are cached in memory. The cache is invalidated when metadata
598    /// is modified via `insert_table_metadata`, `delete_table_metadata`,
599    /// `enable_stream`, or `update_ttl_config`.
600    pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<TableMetadata>> {
601        // Check cache first
602        if let Some(cached) = self.metadata_cache.borrow().get(table_name) {
603            return Ok(Some(cached.clone()));
604        }
605
606        let sql = format!("SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE table_name = ?1");
607        let mut stmt = self.conn.prepare(&sql)?;
608
609        let result = stmt.query_row(params![table_name], row_to_metadata);
610
611        match result {
612            Ok(meta) => {
613                self.metadata_cache
614                    .borrow_mut()
615                    .insert(table_name.to_string(), meta.clone());
616                Ok(Some(meta))
617            }
618            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
619            Err(e) => Err(DynoxideError::from(e)),
620        }
621    }
622
623    /// Delete metadata for a table.
624    pub fn delete_table_metadata(&self, table_name: &str) -> Result<bool> {
625        let affected = self.conn.execute(
626            "DELETE FROM _tables WHERE table_name = ?1",
627            params![table_name],
628        )?;
629        self.metadata_cache.borrow_mut().remove(table_name);
630        Ok(affected > 0)
631    }
632
633    /// Update attribute definitions and GSI definitions for a table.
634    pub fn update_table_metadata(
635        &self,
636        table_name: &str,
637        attribute_definitions: &str,
638        gsi_definitions: Option<&str>,
639    ) -> Result<()> {
640        self.conn.execute(
641            "UPDATE _tables SET attribute_definitions = ?1, gsi_definitions = ?2 WHERE table_name = ?3",
642            params![attribute_definitions, gsi_definitions, table_name],
643        )?;
644        self.metadata_cache.borrow_mut().remove(table_name);
645        Ok(())
646    }
647
648    /// Update provisioned throughput for a table.
649    pub fn update_provisioned_throughput(
650        &self,
651        table_name: &str,
652        provisioned_throughput: &str,
653    ) -> Result<()> {
654        self.conn.execute(
655            "UPDATE _tables SET provisioned_throughput = ?1 WHERE table_name = ?2",
656            params![provisioned_throughput, table_name],
657        )?;
658        self.metadata_cache.borrow_mut().remove(table_name);
659        Ok(())
660    }
661
662    /// Clear provisioned throughput for a table (sets to SQL NULL).
663    pub fn clear_provisioned_throughput(&self, table_name: &str) -> Result<()> {
664        self.conn.execute(
665            "UPDATE _tables SET provisioned_throughput = NULL WHERE table_name = ?1",
666            params![table_name],
667        )?;
668        self.metadata_cache.borrow_mut().remove(table_name);
669        Ok(())
670    }
671
672    /// Update billing mode for a table.
673    pub fn update_billing_mode(&self, table_name: &str, billing_mode: &str) -> Result<()> {
674        self.conn.execute(
675            "UPDATE _tables SET billing_mode = ?1 WHERE table_name = ?2",
676            params![billing_mode, table_name],
677        )?;
678        self.metadata_cache.borrow_mut().remove(table_name);
679        Ok(())
680    }
681
682    // -----------------------------------------------------------------------
683    // Tag operations
684    // -----------------------------------------------------------------------
685
686    /// Get tags for a table.
687    pub fn get_tags(&self, table_name: &str) -> Result<Vec<crate::types::Tag>> {
688        let tags_json: Option<String> = self.conn.query_row(
689            "SELECT tags FROM _tables WHERE table_name = ?1",
690            params![table_name],
691            |row| row.get(0),
692        )?;
693
694        match tags_json {
695            Some(json) => serde_json::from_str(&json)
696                .map_err(|e| DynoxideError::InternalServerError(format!("Bad tags JSON: {e}"))),
697            None => Ok(Vec::new()),
698        }
699    }
700
701    /// Set (merge) tags on a table. New keys overwrite existing keys.
702    pub fn set_tags(&self, table_name: &str, new_tags: &[crate::types::Tag]) -> Result<()> {
703        use std::collections::BTreeMap;
704
705        let existing = self.get_tags(table_name)?;
706        let mut tag_map: BTreeMap<String, String> =
707            existing.into_iter().map(|t| (t.key, t.value)).collect();
708
709        for tag in new_tags {
710            tag_map.insert(tag.key.clone(), tag.value.clone());
711        }
712
713        if tag_map.len() > 50 {
714            return Err(DynoxideError::ValidationException(
715                "One or more parameter values were invalid: \
716                 Too many tags: tag limit is 50"
717                    .to_string(),
718            ));
719        }
720
721        let merged: Vec<crate::types::Tag> = tag_map
722            .into_iter()
723            .map(|(k, v)| crate::types::Tag { key: k, value: v })
724            .collect();
725
726        let json = serde_json::to_string(&merged)
727            .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
728
729        self.conn.execute(
730            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
731            params![json, table_name],
732        )?;
733        Ok(())
734    }
735
736    /// Update the deletion protection setting for a table.
737    pub fn update_deletion_protection(&self, table_name: &str, enabled: bool) -> Result<()> {
738        self.conn.execute(
739            "UPDATE _tables SET deletion_protection_enabled = ?1 WHERE table_name = ?2",
740            params![enabled as i32, table_name],
741        )?;
742        self.metadata_cache.borrow_mut().remove(table_name);
743        Ok(())
744    }
745
746    /// Remove tags by key from a table.
747    pub fn remove_tags(&self, table_name: &str, keys: &[String]) -> Result<()> {
748        let mut tags = self.get_tags(table_name)?;
749        tags.retain(|t| !keys.contains(&t.key));
750
751        let json = if tags.is_empty() {
752            None
753        } else {
754            Some(
755                serde_json::to_string(&tags)
756                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
757            )
758        };
759
760        self.conn.execute(
761            "UPDATE _tables SET tags = ?1 WHERE table_name = ?2",
762            params![json, table_name],
763        )?;
764        Ok(())
765    }
766
767    /// List all table names.
768    pub fn list_table_names(&self) -> Result<Vec<String>> {
769        let mut stmt = self
770            .conn
771            .prepare("SELECT table_name FROM _tables ORDER BY table_name")?;
772        let names = stmt
773            .query_map([], |row| row.get(0))?
774            .collect::<std::result::Result<Vec<String>, _>>()?;
775        Ok(names)
776    }
777
778    /// Check if a table exists in metadata.
779    pub fn table_exists(&self, table_name: &str) -> Result<bool> {
780        let count: i32 = self.conn.query_row(
781            "SELECT COUNT(*) FROM _tables WHERE table_name = ?1",
782            params![table_name],
783            |row| row.get(0),
784        )?;
785        Ok(count > 0)
786    }
787
788    /// Invalidate the cached metadata for a specific table.
789    #[allow(dead_code)]
790    pub(crate) fn invalidate_metadata_cache(&self, table_name: &str) {
791        self.metadata_cache.borrow_mut().remove(table_name);
792    }
793
794    // -----------------------------------------------------------------------
795    // Dynamic data tables
796    // -----------------------------------------------------------------------
797
798    /// Create a data table for a DynamoDB table.
799    pub fn create_data_table(&self, table_name: &str) -> Result<()> {
800        let sql = format!(
801            "CREATE TABLE \"{}\" (
802                pk TEXT NOT NULL,
803                sk TEXT NOT NULL DEFAULT '',
804                item_json TEXT NOT NULL,
805                item_size INTEGER NOT NULL,
806                cached_at REAL,
807                hash_prefix TEXT NOT NULL DEFAULT '',
808                PRIMARY KEY (pk, sk)
809            )",
810            escape_table_name(table_name)
811        );
812        self.conn.execute(&sql, [])?;
813        Ok(())
814    }
815
816    /// Drop a data table.
817    pub fn drop_data_table(&self, table_name: &str) -> Result<()> {
818        let sql = format!("DROP TABLE IF EXISTS \"{}\"", escape_table_name(table_name));
819        self.conn.execute(&sql, [])?;
820        Ok(())
821    }
822
823    /// Create a GSI table.
824    pub fn create_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
825        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
826        let escaped = escape_table_name(&gsi_table_name);
827        let sql = format!(
828            "CREATE TABLE \"{escaped}\" (
829                gsi_pk TEXT NOT NULL,
830                gsi_sk TEXT NOT NULL DEFAULT '',
831                table_pk TEXT NOT NULL,
832                table_sk TEXT NOT NULL DEFAULT '',
833                item_json TEXT NOT NULL,
834                PRIMARY KEY (gsi_pk, gsi_sk, table_pk, table_sk)
835            )"
836        );
837        self.conn.execute(&sql, [])?;
838
839        let idx_name = escape_table_name(&format!("{gsi_table_name}::base_key"));
840        self.conn.execute_batch(&format!(
841            "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (table_pk, table_sk)"
842        ))?;
843        Ok(())
844    }
845
846    /// Drop a GSI table.
847    pub fn drop_gsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
848        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
849        let sql = format!(
850            "DROP TABLE IF EXISTS \"{}\"",
851            escape_table_name(&gsi_table_name)
852        );
853        self.conn.execute(&sql, [])?;
854        Ok(())
855    }
856
857    // -----------------------------------------------------------------------
858    // GSI item operations
859    // -----------------------------------------------------------------------
860
861    /// Insert an item into a GSI table.
862    #[allow(clippy::too_many_arguments)]
863    pub fn insert_gsi_item(
864        &self,
865        table_name: &str,
866        index_name: &str,
867        gsi_pk: &str,
868        gsi_sk: &str,
869        table_pk: &str,
870        table_sk: &str,
871        item_json: &str,
872    ) -> Result<()> {
873        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
874        let sql = format!(
875            "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
876            escape_table_name(&gsi_table_name)
877        );
878        self.conn
879            .prepare_cached(&sql)?
880            .execute(params![gsi_pk, gsi_sk, table_pk, table_sk, item_json])?;
881        Ok(())
882    }
883
884    /// Delete an item from a GSI table by base table primary key.
885    pub fn delete_gsi_item(
886        &self,
887        table_name: &str,
888        index_name: &str,
889        table_pk: &str,
890        table_sk: &str,
891    ) -> Result<()> {
892        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
893        let sql = format!(
894            "DELETE FROM \"{}\" WHERE table_pk = ?1 AND table_sk = ?2",
895            escape_table_name(&gsi_table_name)
896        );
897        self.conn
898            .prepare_cached(&sql)?
899            .execute(params![table_pk, table_sk])?;
900        Ok(())
901    }
902
903    /// Query items from a GSI table.
904    pub fn query_gsi_items(
905        &self,
906        table_name: &str,
907        index_name: &str,
908        gsi_pk: &str,
909        params: &QueryParams,
910    ) -> Result<Vec<(String, String, String)>> {
911        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
912        let mut sql = format!(
913            "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\" WHERE gsi_pk = ?1",
914            escape_table_name(&gsi_table_name)
915        );
916
917        let mut param_idx = 2;
918        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> =
919            vec![Box::new(gsi_pk.to_string())];
920
921        if let Some(cond) = params.sk_condition {
922            // Replace "sk" with "gsi_sk" in the condition.
923            // N.B. This string replacement works because all SQL fragments generated
924            // by key_condition use the form ` sk ` or ` sk>` — they never embed
925            // `sk` without surrounding whitespace/operator. A more robust solution
926            // would thread the column name through the key condition builder, but
927            // that is a larger refactor deferred for now.
928            let gsi_cond = cond.replace(" sk ", " gsi_sk ").replace(" sk>", " gsi_sk>");
929            sql.push(' ');
930            sql.push_str(&gsi_cond);
931            for &p in params.sk_params {
932                all_params.push(Box::new(p.to_string()));
933                param_idx += 1;
934            }
935        }
936
937        // For GSI pagination, use a composite cursor (gsi_sk, table_pk, table_sk)
938        // so that hash-only GSIs (where gsi_sk is always '') paginate correctly.
939        if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
940            params.exclusive_start_sk,
941            params.exclusive_start_base_pk,
942            params.exclusive_start_base_sk,
943        ) {
944            let op = if params.forward { ">" } else { "<" };
945            sql.push_str(&format!(
946                " AND (gsi_sk, table_pk, table_sk) {op} (?{}, ?{}, ?{})",
947                param_idx,
948                param_idx + 1,
949                param_idx + 2
950            ));
951            all_params.push(Box::new(start_sk.to_string()));
952            all_params.push(Box::new(start_base_pk.to_string()));
953            all_params.push(Box::new(start_base_sk.to_string()));
954        } else if let Some(start_sk) = params.exclusive_start_sk {
955            if params.forward {
956                sql.push_str(&format!(" AND gsi_sk > ?{param_idx}"));
957            } else {
958                sql.push_str(&format!(" AND gsi_sk < ?{param_idx}"));
959            }
960            all_params.push(Box::new(start_sk.to_string()));
961        }
962
963        sql.push_str(if params.forward {
964            " ORDER BY gsi_sk ASC, table_pk ASC, table_sk ASC"
965        } else {
966            " ORDER BY gsi_sk DESC, table_pk DESC, table_sk DESC"
967        });
968
969        if let Some(lim) = params.limit {
970            sql.push_str(&format!(" LIMIT {lim}"));
971        }
972
973        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
974            all_params.iter().map(|p| p.as_ref()).collect();
975        let mut stmt = self.conn.prepare(&sql)?;
976        let rows = stmt
977            .query_map(param_refs.as_slice(), |row| {
978                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
979            })?
980            .collect::<std::result::Result<Vec<_>, _>>()?;
981
982        Ok(rows)
983    }
984
985    /// Scan all items from a GSI table.
986    pub fn scan_gsi_items(
987        &self,
988        table_name: &str,
989        index_name: &str,
990        params: &ScanParams,
991    ) -> Result<Vec<(String, String, String)>> {
992        let gsi_table_name = format!("{table_name}::gsi::{index_name}");
993        let mut sql = format!(
994            "SELECT gsi_pk, gsi_sk, item_json FROM \"{}\"",
995            escape_table_name(&gsi_table_name)
996        );
997
998        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
999        let mut where_clauses = Vec::new();
1000        let mut param_idx = 1;
1001
1002        if let (Some(start_pk), Some(start_sk)) =
1003            (params.exclusive_start_pk, params.exclusive_start_sk)
1004        {
1005            where_clauses.push(format!(
1006                "(gsi_pk, gsi_sk) > (?{}, ?{})",
1007                param_idx,
1008                param_idx + 1
1009            ));
1010            params_vec.push(Box::new(start_pk.to_string()));
1011            params_vec.push(Box::new(start_sk.to_string()));
1012            param_idx += 2;
1013        }
1014
1015        // For GSI parallel scan, hash on the base table pk (table_pk column)
1016        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1017            where_clauses.push(format!(
1018                "(fnv1a_hash(table_pk) % ?{}) = ?{}",
1019                param_idx,
1020                param_idx + 1
1021            ));
1022            params_vec.push(Box::new(total as i64));
1023            params_vec.push(Box::new(seg as i64));
1024        }
1025
1026        if !where_clauses.is_empty() {
1027            sql.push_str(" WHERE ");
1028            sql.push_str(&where_clauses.join(" AND "));
1029        }
1030
1031        sql.push_str(" ORDER BY gsi_pk ASC, gsi_sk ASC");
1032
1033        if let Some(lim) = params.limit {
1034            sql.push_str(&format!(" LIMIT {lim}"));
1035        }
1036
1037        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1038            params_vec.iter().map(|p| p.as_ref()).collect();
1039        let mut stmt = self.conn.prepare(&sql)?;
1040        let rows = stmt
1041            .query_map(param_refs.as_slice(), |row| {
1042                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1043            })?
1044            .collect::<std::result::Result<Vec<_>, _>>()?;
1045
1046        Ok(rows)
1047    }
1048
1049    // -----------------------------------------------------------------------
1050    // LSI table operations
1051    // -----------------------------------------------------------------------
1052
1053    /// Create an LSI table for a given base table and index name.
1054    pub fn create_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1055        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1056        let escaped = escape_table_name(&lsi_table_name);
1057        let sql = format!(
1058            "CREATE TABLE \"{escaped}\" (
1059                pk TEXT NOT NULL,
1060                sk TEXT NOT NULL DEFAULT '',
1061                base_pk TEXT NOT NULL,
1062                base_sk TEXT NOT NULL DEFAULT '',
1063                item_json TEXT NOT NULL,
1064                PRIMARY KEY (pk, sk, base_pk, base_sk)
1065            )"
1066        );
1067        self.conn.execute(&sql, [])?;
1068
1069        let idx_name = escape_table_name(&format!("{lsi_table_name}::base_key"));
1070        self.conn.execute_batch(&format!(
1071            "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{escaped}\" (base_pk, base_sk)"
1072        ))?;
1073        Ok(())
1074    }
1075
1076    /// Drop an LSI table.
1077    pub fn drop_lsi_table(&self, table_name: &str, index_name: &str) -> Result<()> {
1078        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1079        let sql = format!(
1080            "DROP TABLE IF EXISTS \"{}\"",
1081            escape_table_name(&lsi_table_name)
1082        );
1083        self.conn.execute(&sql, [])?;
1084        Ok(())
1085    }
1086
1087    // -----------------------------------------------------------------------
1088    // LSI item operations
1089    // -----------------------------------------------------------------------
1090
1091    /// Insert an item into an LSI table.
1092    #[allow(clippy::too_many_arguments)]
1093    pub fn insert_lsi_item(
1094        &self,
1095        table_name: &str,
1096        index_name: &str,
1097        pk: &str,
1098        sk: &str,
1099        base_pk: &str,
1100        base_sk: &str,
1101        item_json: &str,
1102    ) -> Result<()> {
1103        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1104        let sql = format!(
1105            "INSERT OR REPLACE INTO \"{}\" (pk, sk, base_pk, base_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
1106            escape_table_name(&lsi_table_name)
1107        );
1108        self.conn
1109            .prepare_cached(&sql)?
1110            .execute(params![pk, sk, base_pk, base_sk, item_json])?;
1111        Ok(())
1112    }
1113
1114    /// Delete an item from an LSI table by base table primary key.
1115    pub fn delete_lsi_item(
1116        &self,
1117        table_name: &str,
1118        index_name: &str,
1119        base_pk: &str,
1120        base_sk: &str,
1121    ) -> Result<()> {
1122        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1123        let sql = format!(
1124            "DELETE FROM \"{}\" WHERE base_pk = ?1 AND base_sk = ?2",
1125            escape_table_name(&lsi_table_name)
1126        );
1127        self.conn
1128            .prepare_cached(&sql)?
1129            .execute(params![base_pk, base_sk])?;
1130        Ok(())
1131    }
1132
1133    /// Query items from an LSI table.
1134    pub fn query_lsi_items(
1135        &self,
1136        table_name: &str,
1137        index_name: &str,
1138        pk: &str,
1139        params: &QueryParams,
1140    ) -> Result<Vec<(String, String, String)>> {
1141        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1142        let mut sql = format!(
1143            "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1144            escape_table_name(&lsi_table_name)
1145        );
1146
1147        let mut param_idx = 2;
1148        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1149
1150        if let Some(cond) = params.sk_condition {
1151            sql.push(' ');
1152            sql.push_str(cond);
1153            for &p in params.sk_params {
1154                all_params.push(Box::new(p.to_string()));
1155                param_idx += 1;
1156            }
1157        }
1158
1159        // Use composite cursor when all three LSI pagination values are present,
1160        // otherwise fall back to simple sk comparison.
1161        if let (Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1162            params.exclusive_start_sk,
1163            params.exclusive_start_base_pk,
1164            params.exclusive_start_base_sk,
1165        ) {
1166            let op = if params.forward { ">" } else { "<" };
1167            sql.push_str(&format!(
1168                " AND (sk, base_pk, base_sk) {op} (?{}, ?{}, ?{})",
1169                param_idx,
1170                param_idx + 1,
1171                param_idx + 2
1172            ));
1173            all_params.push(Box::new(start_sk.to_string()));
1174            all_params.push(Box::new(start_base_pk.to_string()));
1175            all_params.push(Box::new(start_base_sk.to_string()));
1176        } else if let Some(start_sk) = params.exclusive_start_sk {
1177            if params.forward {
1178                sql.push_str(&format!(" AND sk > ?{param_idx}"));
1179            } else {
1180                sql.push_str(&format!(" AND sk < ?{param_idx}"));
1181            }
1182            all_params.push(Box::new(start_sk.to_string()));
1183        }
1184
1185        sql.push_str(if params.forward {
1186            " ORDER BY sk ASC, base_pk ASC, base_sk ASC"
1187        } else {
1188            " ORDER BY sk DESC, base_pk DESC, base_sk DESC"
1189        });
1190
1191        if let Some(lim) = params.limit {
1192            sql.push_str(&format!(" LIMIT {lim}"));
1193        }
1194
1195        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1196            all_params.iter().map(|p| p.as_ref()).collect();
1197        let mut stmt = self.conn.prepare(&sql)?;
1198        let rows = stmt
1199            .query_map(param_refs.as_slice(), |row| {
1200                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1201            })?
1202            .collect::<std::result::Result<Vec<_>, _>>()?;
1203
1204        Ok(rows)
1205    }
1206
1207    /// Scan all items from an LSI table.
1208    pub fn scan_lsi_items(
1209        &self,
1210        table_name: &str,
1211        index_name: &str,
1212        params: &ScanParams,
1213    ) -> Result<Vec<(String, String, String)>> {
1214        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1215        let mut sql = format!(
1216            "SELECT pk, sk, item_json FROM \"{}\"",
1217            escape_table_name(&lsi_table_name)
1218        );
1219
1220        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1221        let mut where_clauses = Vec::new();
1222        let mut param_idx = 1;
1223
1224        // Use composite cursor when base key values are present for correct LSI pagination.
1225        if let (Some(start_pk), Some(start_sk), Some(start_base_pk), Some(start_base_sk)) = (
1226            params.exclusive_start_pk,
1227            params.exclusive_start_sk,
1228            params.exclusive_start_base_pk,
1229            params.exclusive_start_base_sk,
1230        ) {
1231            where_clauses.push(format!(
1232                "(pk, sk, base_pk, base_sk) > (?{}, ?{}, ?{}, ?{})",
1233                param_idx,
1234                param_idx + 1,
1235                param_idx + 2,
1236                param_idx + 3
1237            ));
1238            params_vec.push(Box::new(start_pk.to_string()));
1239            params_vec.push(Box::new(start_sk.to_string()));
1240            params_vec.push(Box::new(start_base_pk.to_string()));
1241            params_vec.push(Box::new(start_base_sk.to_string()));
1242            param_idx += 4;
1243        } else if let (Some(start_pk), Some(start_sk)) =
1244            (params.exclusive_start_pk, params.exclusive_start_sk)
1245        {
1246            where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1247            params_vec.push(Box::new(start_pk.to_string()));
1248            params_vec.push(Box::new(start_sk.to_string()));
1249            param_idx += 2;
1250        }
1251
1252        // For LSI parallel scan, hash on the base table pk (base_pk column)
1253        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1254            where_clauses.push(format!(
1255                "(fnv1a_hash(base_pk) % ?{}) = ?{}",
1256                param_idx,
1257                param_idx + 1
1258            ));
1259            params_vec.push(Box::new(total as i64));
1260            params_vec.push(Box::new(seg as i64));
1261        }
1262
1263        if !where_clauses.is_empty() {
1264            sql.push_str(" WHERE ");
1265            sql.push_str(&where_clauses.join(" AND "));
1266        }
1267
1268        sql.push_str(" ORDER BY pk ASC, sk ASC");
1269
1270        if let Some(lim) = params.limit {
1271            sql.push_str(&format!(" LIMIT {lim}"));
1272        }
1273
1274        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1275            params_vec.iter().map(|p| p.as_ref()).collect();
1276        let mut stmt = self.conn.prepare(&sql)?;
1277        let rows = stmt
1278            .query_map(param_refs.as_slice(), |row| {
1279                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1280            })?
1281            .collect::<std::result::Result<Vec<_>, _>>()?;
1282
1283        Ok(rows)
1284    }
1285
1286    // -----------------------------------------------------------------------
1287    // Transaction support
1288    // -----------------------------------------------------------------------
1289
1290    /// Begin an immediate SQLite transaction.
1291    pub fn begin_transaction(&self) -> Result<()> {
1292        self.conn.execute_batch("BEGIN IMMEDIATE")?;
1293        Ok(())
1294    }
1295
1296    /// Commit the current transaction.
1297    pub fn commit(&self) -> Result<()> {
1298        self.conn.execute_batch("COMMIT")?;
1299        Ok(())
1300    }
1301
1302    /// Rollback the current transaction.
1303    pub fn rollback(&self) -> Result<()> {
1304        self.conn.execute_batch("ROLLBACK")?;
1305        Ok(())
1306    }
1307
1308    /// Set aggressive PRAGMAs for bulk loading.
1309    ///
1310    /// Disables fsync, increases cache, and enables memory-mapped I/O.
1311    /// Only safe when data loss on crash is acceptable (e.g., fresh import
1312    /// that can be re-run).
1313    pub fn enable_bulk_loading(&self) -> Result<()> {
1314        self.conn.execute_batch(
1315            "PRAGMA synchronous = OFF;
1316             PRAGMA cache_size = -64000;
1317             PRAGMA temp_store = MEMORY;
1318             PRAGMA mmap_size = 268435456;",
1319        )?;
1320        Ok(())
1321    }
1322
1323    /// Restore normal PRAGMAs after bulk loading.
1324    pub fn disable_bulk_loading(&self) -> Result<()> {
1325        self.conn.execute_batch(
1326            "PRAGMA synchronous = NORMAL;
1327             PRAGMA cache_size = -2000;
1328             PRAGMA temp_store = DEFAULT;
1329             PRAGMA mmap_size = 0;",
1330        )?;
1331        Ok(())
1332    }
1333
1334    // -----------------------------------------------------------------------
1335    // Item CRUD
1336    // -----------------------------------------------------------------------
1337
1338    /// Insert or replace an item.
1339    pub fn put_item(
1340        &self,
1341        table_name: &str,
1342        pk: &str,
1343        sk: &str,
1344        item_json: &str,
1345        item_size: usize,
1346    ) -> Result<Option<String>> {
1347        self.put_item_with_hash(table_name, pk, sk, item_json, item_size, "")
1348    }
1349
1350    /// Put an item with an explicit hash prefix for parallel scan ordering.
1351    pub fn put_item_with_hash(
1352        &self,
1353        table_name: &str,
1354        pk: &str,
1355        sk: &str,
1356        item_json: &str,
1357        item_size: usize,
1358        hash_prefix: &str,
1359    ) -> Result<Option<String>> {
1360        // First, try to get the old item for return value
1361        let old_item = self.get_item(table_name, pk, sk)?;
1362
1363        let escaped = escape_table_name(table_name);
1364        let sql = format!(
1365            "INSERT OR REPLACE INTO \"{escaped}\" (pk, sk, item_json, item_size, cached_at, hash_prefix) \
1366             VALUES (?1, ?2, ?3, ?4, \
1367             (SELECT cached_at FROM \"{escaped}\" WHERE pk = ?1 AND sk = ?2), ?5)"
1368        );
1369        self.conn.execute(
1370            &sql,
1371            params![pk, sk, item_json, item_size as i64, hash_prefix],
1372        )?;
1373
1374        Ok(old_item)
1375    }
1376
1377    /// Get a single item by primary key.
1378    pub fn get_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1379        let sql = format!(
1380            "SELECT item_json FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1381            escape_table_name(table_name)
1382        );
1383        let result = self.conn.query_row(&sql, params![pk, sk], |row| row.get(0));
1384
1385        match result {
1386            Ok(json) => Ok(Some(json)),
1387            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
1388            Err(e) => Err(DynoxideError::from(e)),
1389        }
1390    }
1391
1392    /// Return the total item_size for all items sharing the given partition key.
1393    pub fn get_partition_size(&self, table_name: &str, pk: &str) -> Result<i64> {
1394        let sql = format!(
1395            "SELECT COALESCE(SUM(item_size), 0) FROM \"{}\" WHERE pk = ?1",
1396            escape_table_name(table_name)
1397        );
1398        let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1399        Ok(size)
1400    }
1401
1402    /// Return the total size of LSI items for a given partition key.
1403    /// LSI items are stored as JSON text, so we use length(item_json).
1404    pub fn get_lsi_partition_size(
1405        &self,
1406        table_name: &str,
1407        index_name: &str,
1408        pk: &str,
1409    ) -> Result<i64> {
1410        let lsi_table_name = format!("{table_name}::lsi::{index_name}");
1411        let sql = format!(
1412            "SELECT COALESCE(SUM(length(item_json)), 0) FROM \"{}\" WHERE pk = ?1",
1413            escape_table_name(&lsi_table_name)
1414        );
1415        let size: i64 = self.conn.query_row(&sql, params![pk], |row| row.get(0))?;
1416        Ok(size)
1417    }
1418
1419    /// Delete an item by primary key. Returns the old item_json if it existed.
1420    pub fn delete_item(&self, table_name: &str, pk: &str, sk: &str) -> Result<Option<String>> {
1421        let old_item = self.get_item(table_name, pk, sk)?;
1422
1423        let sql = format!(
1424            "DELETE FROM \"{}\" WHERE pk = ?1 AND sk = ?2",
1425            escape_table_name(table_name)
1426        );
1427        self.conn.execute(&sql, params![pk, sk])?;
1428
1429        Ok(old_item)
1430    }
1431
1432    /// Query items by partition key with optional sort key condition.
1433    ///
1434    /// `sk_condition` is a SQL fragment like `AND sk > ?` with `sk_params` providing values.
1435    /// Returns `(items, has_more)` where items is a vec of `(pk, sk, item_json)`.
1436    pub fn query_items(
1437        &self,
1438        table_name: &str,
1439        pk: &str,
1440        params: &QueryParams,
1441    ) -> Result<Vec<(String, String, String)>> {
1442        let mut sql = format!(
1443            "SELECT pk, sk, item_json FROM \"{}\" WHERE pk = ?1",
1444            escape_table_name(table_name)
1445        );
1446
1447        let mut param_idx = 2;
1448        let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(pk.to_string())];
1449
1450        if let Some(cond) = params.sk_condition {
1451            sql.push(' ');
1452            sql.push_str(cond);
1453            for &p in params.sk_params {
1454                all_params.push(Box::new(p.to_string()));
1455                param_idx += 1;
1456            }
1457        }
1458
1459        if let Some(start_sk) = params.exclusive_start_sk {
1460            if params.forward {
1461                sql.push_str(&format!(" AND sk > ?{param_idx}"));
1462            } else {
1463                sql.push_str(&format!(" AND sk < ?{param_idx}"));
1464            }
1465            all_params.push(Box::new(start_sk.to_string()));
1466        }
1467
1468        sql.push_str(if params.forward {
1469            " ORDER BY sk ASC"
1470        } else {
1471            " ORDER BY sk DESC"
1472        });
1473
1474        if let Some(lim) = params.limit {
1475            sql.push_str(&format!(" LIMIT {lim}"));
1476        }
1477
1478        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1479            all_params.iter().map(|p| p.as_ref()).collect();
1480        let mut stmt = self.conn.prepare(&sql)?;
1481        let rows = stmt
1482            .query_map(param_refs.as_slice(), |row| {
1483                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1484            })?
1485            .collect::<std::result::Result<Vec<_>, _>>()?;
1486
1487        Ok(rows)
1488    }
1489
1490    /// Scan items from a table with pagination.
1491    ///
1492    /// Returns `(pk, sk, item_json)` tuples ordered by hash_prefix for
1493    /// dynalite-compatible parallel scan behaviour.
1494    pub fn scan_items(
1495        &self,
1496        table_name: &str,
1497        params: &ScanParams,
1498    ) -> Result<Vec<(String, String, String)>> {
1499        let mut sql = format!(
1500            "SELECT pk, sk, item_json FROM \"{}\"",
1501            escape_table_name(table_name)
1502        );
1503
1504        let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1505        let mut where_clauses = Vec::new();
1506        let mut param_idx = 1;
1507
1508        // For parallel scan with hash_prefix-based segment filtering
1509        let is_parallel = params.segment.is_some() && params.total_segments.is_some();
1510
1511        if let (Some(start_pk), Some(start_sk)) =
1512            (params.exclusive_start_pk, params.exclusive_start_sk)
1513        {
1514            if is_parallel {
1515                // For parallel scans, pagination must respect hash_prefix ordering
1516                where_clauses.push(format!(
1517                    "(hash_prefix, pk, sk) > ((SELECT hash_prefix FROM \"{}\" WHERE pk = ?{} AND sk = ?{} LIMIT 1), ?{}, ?{})",
1518                    escape_table_name(table_name),
1519                    param_idx, param_idx + 1,
1520                    param_idx, param_idx + 1
1521                ));
1522            } else {
1523                where_clauses.push(format!("(pk, sk) > (?{}, ?{})", param_idx, param_idx + 1));
1524            }
1525            params_vec.push(Box::new(start_pk.to_string()));
1526            params_vec.push(Box::new(start_sk.to_string()));
1527            param_idx += 2;
1528        }
1529
1530        if let (Some(seg), Some(total)) = (params.segment, params.total_segments) {
1531            // Use hash_prefix column for segment assignment.
1532            // Bucket = parseInt(hash_prefix[0..3], 16).
1533            // Segment owns buckets from ceil(4096*seg/total) to ceil(4096*(seg+1)/total)-1.
1534            let start_bucket = ceiling_div(HASH_BUCKETS * seg, total);
1535            let end_bucket = ceiling_div(HASH_BUCKETS * (seg + 1), total) - 1;
1536            let start_hex = format!("{:03x}", start_bucket);
1537            let end_hex = format!("{:03x}", end_bucket);
1538            // hash_prefix is a 6-char hex string; compare the first 3 chars
1539            where_clauses.push(format!(
1540                "substr(hash_prefix, 1, 3) >= ?{} AND substr(hash_prefix, 1, 3) <= ?{}",
1541                param_idx,
1542                param_idx + 1
1543            ));
1544            params_vec.push(Box::new(start_hex));
1545            params_vec.push(Box::new(end_hex));
1546        }
1547
1548        if !where_clauses.is_empty() {
1549            sql.push_str(" WHERE ");
1550            sql.push_str(&where_clauses.join(" AND "));
1551        }
1552
1553        // For parallel scans, order by hash_prefix for dynalite-compatible behaviour.
1554        // For regular scans, use pk/sk ordering.
1555        if is_parallel {
1556            sql.push_str(" ORDER BY hash_prefix ASC, pk ASC, sk ASC");
1557        } else {
1558            sql.push_str(" ORDER BY pk ASC, sk ASC");
1559        }
1560
1561        if let Some(lim) = params.limit {
1562            sql.push_str(&format!(" LIMIT {lim}"));
1563        }
1564
1565        let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1566            params_vec.iter().map(|p| p.as_ref()).collect();
1567        let mut stmt = self.conn.prepare(&sql)?;
1568        let rows = stmt
1569            .query_map(param_refs.as_slice(), |row| {
1570                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1571            })?
1572            .collect::<std::result::Result<Vec<_>, _>>()?;
1573
1574        Ok(rows)
1575    }
1576
1577    /// Count items in a table.
1578    pub fn count_items(&self, table_name: &str) -> Result<i64> {
1579        let sql = format!("SELECT COUNT(*) FROM \"{}\"", escape_table_name(table_name));
1580        let count: i64 = self.conn.query_row(&sql, [], |row| row.get(0))?;
1581        Ok(count)
1582    }
1583
1584    // -----------------------------------------------------------------------
1585    // Introspection
1586    // -----------------------------------------------------------------------
1587
1588    /// Get the database file path, or `None` for in-memory databases.
1589    pub fn db_path(&self) -> Option<String> {
1590        self.conn
1591            .path()
1592            .filter(|p| !p.is_empty())
1593            .map(|p| p.to_owned())
1594    }
1595
1596    /// Get the total database size in bytes.
1597    pub fn db_size_bytes(&self) -> Result<u64> {
1598        let size: i64 = self.conn.query_row(
1599            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1600            [],
1601            |row| row.get(0),
1602        )?;
1603        Ok(size as u64)
1604    }
1605
1606    /// Count the number of DynamoDB tables.
1607    pub fn table_count(&self) -> Result<usize> {
1608        let count: i64 = self
1609            .conn
1610            .query_row("SELECT COUNT(*) FROM _tables", [], |row| row.get(0))?;
1611        Ok(count as usize)
1612    }
1613
1614    /// Get per-table statistics: name, item count, and approximate size in bytes.
1615    ///
1616    /// Uses a single query per table (COUNT + SUM combined) instead of separate queries.
1617    pub fn table_stats(&self) -> Result<Vec<TableStats>> {
1618        let table_names = self.list_table_names()?;
1619        let mut stats = Vec::with_capacity(table_names.len());
1620        for name in table_names {
1621            let sql = format!(
1622                "SELECT COUNT(*), COALESCE(SUM(item_size), 0) FROM \"{}\"",
1623                escape_table_name(&name)
1624            );
1625            let (item_count, size_bytes): (i64, i64) = self
1626                .conn
1627                .query_row(&sql, [], |row| Ok((row.get(0)?, row.get(1)?)))?;
1628            stats.push(TableStats {
1629                table_name: name,
1630                item_count,
1631                size_bytes: size_bytes as u64,
1632            });
1633        }
1634        Ok(stats)
1635    }
1636
1637    /// Get combined database info in a single call for atomic consistency.
1638    ///
1639    /// Returns all introspection data that `get_database_info` tools need
1640    /// without releasing the lock between queries.
1641    pub fn database_info(&self) -> Result<DatabaseInfo> {
1642        let path = self.db_path();
1643        let size_bytes = self.db_size_bytes()?;
1644        let table_count = self.table_count()?;
1645        let stats = self.table_stats()?;
1646
1647        let mut table_details = Vec::with_capacity(stats.len());
1648        for s in stats {
1649            let metadata = self.get_table_metadata(&s.table_name)?;
1650            table_details.push(TableInfoEntry { stats: s, metadata });
1651        }
1652
1653        Ok(DatabaseInfo {
1654            path,
1655            size_bytes,
1656            table_count,
1657            tables: table_details,
1658        })
1659    }
1660
1661    // -----------------------------------------------------------------------
1662    // Snapshot operations
1663    // -----------------------------------------------------------------------
1664
1665    /// Create a snapshot of the database by copying it to the given path.
1666    /// Uses `VACUUM INTO` which works for both in-memory and file-backed databases.
1667    pub fn vacuum_into(&self, path: &str) -> Result<()> {
1668        if path.contains('\0') {
1669            return Err(DynoxideError::ValidationException(
1670                "path contains null byte".to_string(),
1671            ));
1672        }
1673        self.conn
1674            .execute_batch(&format!("VACUUM INTO '{}'", path.replace('\'', "''")))?;
1675        Ok(())
1676    }
1677
1678    /// Run VACUUM to compact the database file in place.
1679    pub fn vacuum(&self) -> Result<()> {
1680        self.conn.execute_batch("VACUUM")?;
1681        Ok(())
1682    }
1683
1684    /// Restore the database from a snapshot file using SQLite's backup API.
1685    /// This replaces the current database contents with the snapshot contents.
1686    /// Works for both in-memory and file-backed databases.
1687    pub fn restore_from(&mut self, path: &str) -> Result<()> {
1688        let source = Connection::open(path)?;
1689        self.restore_from_connection(&source)
1690    }
1691
1692    /// Backup the current database to a new in-memory SQLite connection.
1693    ///
1694    /// Used for in-memory snapshot storage — the returned connection holds
1695    /// a complete copy of the database without touching the filesystem.
1696    pub fn backup_to_memory(&self) -> Result<Connection> {
1697        let mut dest = Connection::open_in_memory()?;
1698        {
1699            let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
1700            backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1701        }
1702        Ok(dest)
1703    }
1704
1705    /// Restore the database from another SQLite connection using the backup API.
1706    ///
1707    /// Replaces the current database contents with the source connection's
1708    /// contents. Invalidates the metadata cache.
1709    pub fn restore_from_connection(&mut self, source: &Connection) -> Result<()> {
1710        let backup = rusqlite::backup::Backup::new(source, &mut self.conn)?;
1711        backup.run_to_completion(100, std::time::Duration::from_millis(0), None)?;
1712        self.metadata_cache.borrow_mut().clear();
1713        Ok(())
1714    }
1715
1716    /// Get the database size in bytes for an arbitrary connection.
1717    pub fn connection_size_bytes(conn: &Connection) -> Result<u64> {
1718        let size: i64 = conn.query_row(
1719            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
1720            [],
1721            |row| row.get(0),
1722        )?;
1723        Ok(size as u64)
1724    }
1725
1726    // -----------------------------------------------------------------------
1727    // Stream operations
1728    // -----------------------------------------------------------------------
1729
1730    /// Enable streams on a table.
1731    pub fn enable_stream(&self, table_name: &str, view_type: &str, label: &str) -> Result<()> {
1732        self.conn.execute(
1733            "UPDATE _tables SET stream_enabled = 1, stream_view_type = ?1, stream_label = ?2 WHERE table_name = ?3",
1734            params![view_type, label, table_name],
1735        )?;
1736        self.metadata_cache.borrow_mut().remove(table_name);
1737        Ok(())
1738    }
1739
1740    /// Disable streams on a table.
1741    pub fn disable_stream(&self, table_name: &str) -> Result<()> {
1742        self.conn.execute(
1743            "UPDATE _tables SET stream_enabled = 0 WHERE table_name = ?1",
1744            params![table_name],
1745        )?;
1746        self.metadata_cache.borrow_mut().remove(table_name);
1747        Ok(())
1748    }
1749
1750    /// Insert a stream record.
1751    #[allow(clippy::too_many_arguments)]
1752    pub fn insert_stream_record(
1753        &self,
1754        table_name: &str,
1755        event_name: &str,
1756        keys_json: &str,
1757        new_image: Option<&str>,
1758        old_image: Option<&str>,
1759        sequence_number: &str,
1760        shard_id: &str,
1761        created_at: i64,
1762    ) -> Result<()> {
1763        self.insert_stream_record_with_identity(
1764            table_name,
1765            event_name,
1766            keys_json,
1767            new_image,
1768            old_image,
1769            sequence_number,
1770            shard_id,
1771            created_at,
1772            None,
1773        )
1774    }
1775
1776    /// Insert a stream record with optional user identity (for TTL deletions).
1777    #[allow(clippy::too_many_arguments)]
1778    pub fn insert_stream_record_with_identity(
1779        &self,
1780        table_name: &str,
1781        event_name: &str,
1782        keys_json: &str,
1783        new_image: Option<&str>,
1784        old_image: Option<&str>,
1785        sequence_number: &str,
1786        shard_id: &str,
1787        created_at: i64,
1788        user_identity: Option<&str>,
1789    ) -> Result<()> {
1790        self.conn.execute(
1791            "INSERT INTO _stream_records (table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity)
1792             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1793            params![table_name, event_name, keys_json, new_image, old_image, sequence_number, shard_id, created_at, user_identity],
1794        )?;
1795        Ok(())
1796    }
1797
1798    /// Get the next sequence number for a table's stream.
1799    pub fn next_stream_sequence_number(&self, table_name: &str) -> Result<i64> {
1800        let result: std::result::Result<i64, _> = self.conn.query_row(
1801            "SELECT COALESCE(MAX(CAST(sequence_number AS INTEGER)), 0) + 1 FROM _stream_records WHERE table_name = ?1",
1802            params![table_name],
1803            |row| row.get(0),
1804        );
1805        match result {
1806            Ok(n) => Ok(n),
1807            Err(_) => Ok(1),
1808        }
1809    }
1810
1811    /// Get stream records for a shard starting after a given sequence number.
1812    pub fn get_stream_records(
1813        &self,
1814        table_name: &str,
1815        shard_id: &str,
1816        after_sequence: i64,
1817        limit: usize,
1818    ) -> Result<Vec<StreamRecord>> {
1819        let mut stmt = self.conn.prepare(
1820            "SELECT event_name, keys_json, new_image, old_image, sequence_number, created_at, user_identity
1821             FROM _stream_records
1822             WHERE table_name = ?1 AND shard_id = ?2 AND CAST(sequence_number AS INTEGER) > ?3
1823             ORDER BY CAST(sequence_number AS INTEGER) ASC
1824             LIMIT ?4",
1825        )?;
1826        let rows = stmt
1827            .query_map(
1828                params![table_name, shard_id, after_sequence, limit as i64],
1829                |row| {
1830                    Ok(StreamRecord {
1831                        event_name: row.get(0)?,
1832                        keys_json: row.get(1)?,
1833                        new_image: row.get(2)?,
1834                        old_image: row.get(3)?,
1835                        sequence_number: row.get(4)?,
1836                        created_at: row.get(5)?,
1837                        user_identity: row.get(6)?,
1838                    })
1839                },
1840            )?
1841            .collect::<std::result::Result<Vec<_>, _>>()?;
1842        Ok(rows)
1843    }
1844
1845    /// List tables that have streams enabled.
1846    pub fn list_stream_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1847        let sql = format!(
1848            "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE stream_enabled = 1 ORDER BY table_name"
1849        );
1850        let mut stmt = self.conn.prepare(&sql)?;
1851        let rows = stmt
1852            .query_map([], row_to_metadata)?
1853            .collect::<std::result::Result<Vec<_>, _>>()?;
1854        Ok(rows)
1855    }
1856
1857    // -----------------------------------------------------------------------
1858    // TTL operations
1859    // -----------------------------------------------------------------------
1860
1861    /// Update TTL configuration for a table.
1862    pub fn update_ttl_config(
1863        &self,
1864        table_name: &str,
1865        attribute_name: Option<&str>,
1866        enabled: bool,
1867    ) -> Result<()> {
1868        self.conn.execute(
1869            "UPDATE _tables SET ttl_attribute = ?1, ttl_enabled = ?2 WHERE table_name = ?3",
1870            params![attribute_name, enabled as i32, table_name],
1871        )?;
1872        self.metadata_cache.borrow_mut().remove(table_name);
1873        Ok(())
1874    }
1875
1876    /// List tables that have TTL enabled.
1877    pub fn list_ttl_enabled_tables(&self) -> Result<Vec<TableMetadata>> {
1878        let sql = format!(
1879            "SELECT {TABLE_METADATA_COLUMNS} FROM _tables WHERE ttl_enabled = 1 ORDER BY table_name"
1880        );
1881        let mut stmt = self.conn.prepare(&sql)?;
1882        let rows = stmt
1883            .query_map([], row_to_metadata)?
1884            .collect::<std::result::Result<Vec<_>, _>>()?;
1885        Ok(rows)
1886    }
1887
1888    /// Get the min and max sequence numbers for a shard.
1889    pub fn get_shard_sequence_range(
1890        &self,
1891        table_name: &str,
1892        shard_id: &str,
1893    ) -> Result<(Option<String>, Option<String>)> {
1894        let result: std::result::Result<(Option<String>, Option<String>), _> = self.conn.query_row(
1895            "SELECT MIN(sequence_number), MAX(sequence_number) FROM _stream_records WHERE table_name = ?1 AND shard_id = ?2",
1896            params![table_name, shard_id],
1897            |row| Ok((row.get(0)?, row.get(1)?)),
1898        );
1899        match result {
1900            Ok(range) => Ok(range),
1901            Err(_) => Ok((None, None)),
1902        }
1903    }
1904
1905    // -----------------------------------------------------------------------
1906    // Cache tracking (cached_at)
1907    // -----------------------------------------------------------------------
1908
1909    /// Update the `cached_at` timestamp for a single item.
1910    pub fn touch_cached_at(
1911        &self,
1912        table_name: &str,
1913        pk: &str,
1914        sk: &str,
1915        timestamp: f64,
1916    ) -> Result<()> {
1917        let sql = format!(
1918            "UPDATE \"{}\" SET cached_at = ?1 WHERE pk = ?2 AND sk = ?3",
1919            escape_table_name(table_name)
1920        );
1921        self.conn.execute(&sql, params![timestamp, pk, sk])?;
1922        Ok(())
1923    }
1924
1925    /// Get items ordered by `cached_at` (oldest first) for LRU eviction.
1926    ///
1927    /// Returns `(pk, sk, item_size)` tuples. Items with NULL `cached_at`
1928    /// are excluded (they were never cached from a remote source).
1929    pub fn get_lru_items(
1930        &self,
1931        table_name: &str,
1932        limit: usize,
1933    ) -> Result<Vec<(String, String, i64)>> {
1934        let sql = format!(
1935            "SELECT pk, sk, item_size FROM \"{}\" WHERE cached_at IS NOT NULL ORDER BY cached_at ASC LIMIT ?1",
1936            escape_table_name(table_name)
1937        );
1938        let mut stmt = self.conn.prepare(&sql)?;
1939        let rows = stmt
1940            .query_map(params![limit as i64], |row| {
1941                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
1942            })?
1943            .collect::<std::result::Result<Vec<_>, _>>()?;
1944        Ok(rows)
1945    }
1946}
1947
1948/// A stream record from the `_stream_records` table.
1949#[derive(Debug, Clone)]
1950pub struct StreamRecord {
1951    pub event_name: String,
1952    pub keys_json: String,
1953    pub new_image: Option<String>,
1954    pub old_image: Option<String>,
1955    pub sequence_number: String,
1956    pub created_at: i64,
1957    pub user_identity: Option<String>,
1958}
1959
1960/// Per-table statistics returned by `Storage::table_stats()`.
1961#[derive(Debug, Clone)]
1962pub struct TableStats {
1963    pub table_name: String,
1964    pub item_count: i64,
1965    pub size_bytes: u64,
1966}
1967
1968/// Combined database introspection info returned by `Storage::database_info()`.
1969#[derive(Debug, Clone)]
1970pub struct DatabaseInfo {
1971    pub path: Option<String>,
1972    pub size_bytes: u64,
1973    pub table_count: usize,
1974    pub tables: Vec<TableInfoEntry>,
1975}
1976
1977/// Per-table stats + metadata for `DatabaseInfo`.
1978#[derive(Debug, Clone)]
1979pub struct TableInfoEntry {
1980    pub stats: TableStats,
1981    pub metadata: Option<TableMetadata>,
1982}
1983
1984/// Escape double quotes in table names for safe SQL identifier use.
1985pub(crate) fn escape_table_name(name: &str) -> String {
1986    name.replace('"', "\"\"")
1987}
1988
1989/// Metadata row from the `_tables` table.
1990///
1991/// Note: The `tags` column is intentionally excluded. Tags are not on the hot
1992/// path for item operations and are accessed via separate `get_tags`/`set_tags`
1993/// methods to keep the metadata cache lean.
1994#[derive(Debug, Clone)]
1995pub struct TableMetadata {
1996    pub table_name: String,
1997    pub key_schema: String,
1998    pub attribute_definitions: String,
1999    pub gsi_definitions: Option<String>,
2000    pub lsi_definitions: Option<String>,
2001    pub stream_enabled: bool,
2002    pub stream_view_type: Option<String>,
2003    pub stream_label: Option<String>,
2004    pub ttl_attribute: Option<String>,
2005    pub ttl_enabled: bool,
2006    pub created_at: i64,
2007    pub table_status: String,
2008    pub billing_mode: Option<String>,
2009    pub provisioned_throughput: Option<String>,
2010    pub sse_specification: Option<String>,
2011    pub table_class: Option<String>,
2012    pub deletion_protection_enabled: bool,
2013}
2014
2015/// The standard SELECT column list for _tables queries.
2016const TABLE_METADATA_COLUMNS: &str = "table_name, key_schema, attribute_definitions, gsi_definitions, \
2017     lsi_definitions, stream_enabled, stream_view_type, stream_label, ttl_attribute, ttl_enabled, \
2018     created_at, table_status, billing_mode, provisioned_throughput, \
2019     sse_specification, table_class, deletion_protection_enabled";
2020
2021/// Map a row from the _tables SELECT to a TableMetadata struct.
2022fn row_to_metadata(row: &rusqlite::Row) -> rusqlite::Result<TableMetadata> {
2023    Ok(TableMetadata {
2024        table_name: row.get(0)?,
2025        key_schema: row.get(1)?,
2026        attribute_definitions: row.get(2)?,
2027        gsi_definitions: row.get(3)?,
2028        lsi_definitions: row.get(4)?,
2029        stream_enabled: row.get::<_, i32>(5)? != 0,
2030        stream_view_type: row.get(6)?,
2031        stream_label: row.get(7)?,
2032        ttl_attribute: row.get(8)?,
2033        ttl_enabled: row.get::<_, i32>(9)? != 0,
2034        created_at: row.get(10)?,
2035        table_status: row.get(11)?,
2036        billing_mode: row.get(12)?,
2037        provisioned_throughput: row.get(13)?,
2038        sse_specification: row.get(14)?,
2039        table_class: row.get(15)?,
2040        deletion_protection_enabled: row.get::<_, i32>(16).unwrap_or(0) != 0,
2041    })
2042}
2043
2044#[cfg(test)]
2045mod tests {
2046    use super::*;
2047
2048    fn test_storage() -> Storage {
2049        Storage::memory().expect("Failed to create in-memory storage")
2050    }
2051
2052    #[test]
2053    fn test_initialize_creates_metadata_tables() {
2054        let storage = test_storage();
2055        // _config and _tables should exist
2056        let version: String = storage
2057            .conn()
2058            .query_row(
2059                "SELECT value FROM _config WHERE key = 'schema_version'",
2060                [],
2061                |row| row.get(0),
2062            )
2063            .unwrap();
2064        assert_eq!(version, SCHEMA_VERSION);
2065    }
2066
2067    #[test]
2068    fn test_wal_mode_enabled() {
2069        let storage = test_storage();
2070        let mode: String = storage
2071            .conn()
2072            .query_row("PRAGMA journal_mode", [], |row| row.get(0))
2073            .unwrap();
2074        // In-memory databases may report "memory" instead of "wal"
2075        assert!(mode == "wal" || mode == "memory", "Got mode: {mode}");
2076    }
2077
2078    #[test]
2079    fn test_table_metadata_crud() {
2080        let storage = test_storage();
2081
2082        // Initially no tables
2083        assert!(!storage.table_exists("TestTable").unwrap());
2084        assert!(storage.list_table_names().unwrap().is_empty());
2085
2086        // Insert metadata
2087        storage
2088            .insert_table_metadata(&CreateTableMetadata {
2089                table_name: "TestTable",
2090                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2091                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2092                created_at: 1000000,
2093                ..Default::default()
2094            })
2095            .unwrap();
2096
2097        assert!(storage.table_exists("TestTable").unwrap());
2098        assert_eq!(storage.list_table_names().unwrap(), vec!["TestTable"]);
2099
2100        // Get metadata
2101        let meta = storage.get_table_metadata("TestTable").unwrap().unwrap();
2102        assert_eq!(meta.table_name, "TestTable");
2103        assert_eq!(meta.table_status, "ACTIVE");
2104        assert_eq!(meta.created_at, 1000000);
2105
2106        // Delete metadata
2107        assert!(storage.delete_table_metadata("TestTable").unwrap());
2108        assert!(!storage.table_exists("TestTable").unwrap());
2109    }
2110
2111    #[test]
2112    fn test_create_and_drop_data_table() {
2113        let storage = test_storage();
2114        storage.create_data_table("MyTable").unwrap();
2115
2116        // Should be able to insert into it
2117        storage
2118            .put_item("MyTable", "pk1", "", r#"{"pk":{"S":"pk1"}}"#, 10)
2119            .unwrap();
2120
2121        let item = storage.get_item("MyTable", "pk1", "").unwrap();
2122        assert!(item.is_some());
2123
2124        storage.drop_data_table("MyTable").unwrap();
2125    }
2126
2127    #[test]
2128    fn test_item_crud() {
2129        let storage = test_storage();
2130        storage.create_data_table("Items").unwrap();
2131
2132        // Put item
2133        let old = storage
2134            .put_item(
2135                "Items",
2136                "user#1",
2137                "profile",
2138                r#"{"name":{"S":"Alice"}}"#,
2139                20,
2140            )
2141            .unwrap();
2142        assert!(old.is_none()); // No previous item
2143
2144        // Get item
2145        let item = storage.get_item("Items", "user#1", "profile").unwrap();
2146        assert_eq!(item.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2147
2148        // Replace item (returns old)
2149        let old = storage
2150            .put_item("Items", "user#1", "profile", r#"{"name":{"S":"Bob"}}"#, 18)
2151            .unwrap();
2152        assert_eq!(old.unwrap(), r#"{"name":{"S":"Alice"}}"#);
2153
2154        // Delete item
2155        let deleted = storage.delete_item("Items", "user#1", "profile").unwrap();
2156        assert_eq!(deleted.unwrap(), r#"{"name":{"S":"Bob"}}"#);
2157
2158        // Item should be gone
2159        assert!(
2160            storage
2161                .get_item("Items", "user#1", "profile")
2162                .unwrap()
2163                .is_none()
2164        );
2165    }
2166
2167    #[test]
2168    fn test_query_items() {
2169        let storage = test_storage();
2170        storage.create_data_table("Orders").unwrap();
2171
2172        // Insert several items for the same partition key
2173        for i in 1..=5 {
2174            let sk = format!("order#{i:03}");
2175            let json = format!(r#"{{"id":{{"N":"{i}"}}}}"#);
2176            storage
2177                .put_item("Orders", "user#1", &sk, &json, 10)
2178                .unwrap();
2179        }
2180
2181        // Query all for partition key
2182        let results = storage
2183            .query_items(
2184                "Orders",
2185                "user#1",
2186                &QueryParams {
2187                    forward: true,
2188                    ..Default::default()
2189                },
2190            )
2191            .unwrap();
2192        assert_eq!(results.len(), 5);
2193        assert_eq!(results[0].1, "order#001"); // Sorted ascending
2194
2195        // Query with limit
2196        let results = storage
2197            .query_items(
2198                "Orders",
2199                "user#1",
2200                &QueryParams {
2201                    forward: true,
2202                    limit: Some(2),
2203                    ..Default::default()
2204                },
2205            )
2206            .unwrap();
2207        assert_eq!(results.len(), 2);
2208
2209        // Query reverse
2210        let results = storage
2211            .query_items(
2212                "Orders",
2213                "user#1",
2214                &QueryParams {
2215                    forward: false,
2216                    limit: Some(2),
2217                    ..Default::default()
2218                },
2219            )
2220            .unwrap();
2221        assert_eq!(results.len(), 2);
2222        assert_eq!(results[0].1, "order#005"); // Sorted descending
2223    }
2224
2225    #[test]
2226    fn test_scan_items() {
2227        let storage = test_storage();
2228        storage.create_data_table("ScanTest").unwrap();
2229
2230        storage.put_item("ScanTest", "a", "1", r#"{}"#, 2).unwrap();
2231        storage.put_item("ScanTest", "b", "2", r#"{}"#, 2).unwrap();
2232        storage.put_item("ScanTest", "c", "3", r#"{}"#, 2).unwrap();
2233
2234        let results = storage.scan_items("ScanTest", &Default::default()).unwrap();
2235        assert_eq!(results.len(), 3);
2236
2237        // Scan with limit
2238        let results = storage
2239            .scan_items(
2240                "ScanTest",
2241                &ScanParams {
2242                    limit: Some(2),
2243                    ..Default::default()
2244                },
2245            )
2246            .unwrap();
2247        assert_eq!(results.len(), 2);
2248
2249        // Scan with pagination
2250        let results = storage
2251            .scan_items(
2252                "ScanTest",
2253                &ScanParams {
2254                    limit: Some(2),
2255                    exclusive_start_pk: Some("a"),
2256                    exclusive_start_sk: Some("1"),
2257                    ..Default::default()
2258                },
2259            )
2260            .unwrap();
2261        assert_eq!(results.len(), 2);
2262        assert_eq!(results[0].0, "b"); // Skipped "a"
2263    }
2264
2265    #[test]
2266    fn test_count_items() {
2267        let storage = test_storage();
2268        storage.create_data_table("CountTest").unwrap();
2269
2270        assert_eq!(storage.count_items("CountTest").unwrap(), 0);
2271
2272        storage.put_item("CountTest", "a", "", r#"{}"#, 2).unwrap();
2273        storage.put_item("CountTest", "b", "", r#"{}"#, 2).unwrap();
2274
2275        assert_eq!(storage.count_items("CountTest").unwrap(), 2);
2276    }
2277
2278    #[test]
2279    fn test_gsi_table_lifecycle() {
2280        let storage = test_storage();
2281        storage.create_gsi_table("Orders", "ByDate").unwrap();
2282
2283        // Should be able to write to the GSI table via raw SQL
2284        let gsi_name = "Orders::gsi::ByDate";
2285        let sql = format!(
2286            "INSERT INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) VALUES (?1, ?2, ?3, ?4, ?5)",
2287            gsi_name.replace('"', "\"\"")
2288        );
2289        storage
2290            .conn()
2291            .execute(
2292                &sql,
2293                params!["2024-01-01", "001", "user#1", "order#001", r#"{}"#],
2294            )
2295            .unwrap();
2296
2297        storage.drop_gsi_table("Orders", "ByDate").unwrap();
2298    }
2299
2300    #[test]
2301    fn test_nonexistent_table_metadata() {
2302        let storage = test_storage();
2303        assert!(storage.get_table_metadata("Nonexistent").unwrap().is_none());
2304        assert!(!storage.delete_table_metadata("Nonexistent").unwrap());
2305    }
2306
2307    #[test]
2308    fn test_metadata_cache_hit() {
2309        let storage = test_storage();
2310        storage
2311            .insert_table_metadata(&CreateTableMetadata {
2312                table_name: "CacheTest",
2313                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2314                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2315                created_at: 1000000,
2316                ..Default::default()
2317            })
2318            .unwrap();
2319
2320        // First call populates cache
2321        let meta1 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2322        assert_eq!(meta1.table_name, "CacheTest");
2323
2324        // Second call should hit cache (same result)
2325        let meta2 = storage.get_table_metadata("CacheTest").unwrap().unwrap();
2326        assert_eq!(meta2.table_name, "CacheTest");
2327        assert_eq!(meta1.created_at, meta2.created_at);
2328
2329        // Cache should have the entry
2330        assert!(storage.metadata_cache.borrow().contains_key("CacheTest"));
2331    }
2332
2333    #[test]
2334    fn test_metadata_cache_invalidated_on_delete() {
2335        let storage = test_storage();
2336        storage
2337            .insert_table_metadata(&CreateTableMetadata {
2338                table_name: "DelCache",
2339                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2340                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2341                created_at: 1000000,
2342                ..Default::default()
2343            })
2344            .unwrap();
2345
2346        // Populate cache
2347        storage.get_table_metadata("DelCache").unwrap();
2348        assert!(storage.metadata_cache.borrow().contains_key("DelCache"));
2349
2350        // Delete should invalidate cache
2351        storage.delete_table_metadata("DelCache").unwrap();
2352        assert!(!storage.metadata_cache.borrow().contains_key("DelCache"));
2353    }
2354
2355    #[test]
2356    fn test_metadata_cache_invalidated_on_stream_enable() {
2357        let storage = test_storage();
2358        storage
2359            .insert_table_metadata(&CreateTableMetadata {
2360                table_name: "StreamCache",
2361                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2362                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2363                created_at: 1000000,
2364                ..Default::default()
2365            })
2366            .unwrap();
2367
2368        // Populate cache
2369        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2370        assert!(!meta.stream_enabled);
2371
2372        // Enable stream should invalidate cache
2373        storage
2374            .enable_stream("StreamCache", "NEW_AND_OLD_IMAGES", "2024-01-01T00:00:00")
2375            .unwrap();
2376        assert!(!storage.metadata_cache.borrow().contains_key("StreamCache"));
2377
2378        // Next get should reflect the change
2379        let meta = storage.get_table_metadata("StreamCache").unwrap().unwrap();
2380        assert!(meta.stream_enabled);
2381    }
2382
2383    #[test]
2384    fn test_metadata_cache_invalidated_on_ttl_update() {
2385        let storage = test_storage();
2386        storage
2387            .insert_table_metadata(&CreateTableMetadata {
2388                table_name: "TtlCache",
2389                key_schema: r#"[{"AttributeName":"pk","KeyType":"HASH"}]"#,
2390                attribute_definitions: r#"[{"AttributeName":"pk","AttributeType":"S"}]"#,
2391                created_at: 1000000,
2392                ..Default::default()
2393            })
2394            .unwrap();
2395
2396        // Populate cache
2397        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2398        assert!(!meta.ttl_enabled);
2399
2400        // Update TTL should invalidate cache
2401        storage
2402            .update_ttl_config("TtlCache", Some("expires_at"), true)
2403            .unwrap();
2404        assert!(!storage.metadata_cache.borrow().contains_key("TtlCache"));
2405
2406        // Next get should reflect the change
2407        let meta = storage.get_table_metadata("TtlCache").unwrap().unwrap();
2408        assert!(meta.ttl_enabled);
2409        assert_eq!(meta.ttl_attribute, Some("expires_at".to_string()));
2410    }
2411
2412    #[test]
2413    fn test_num_to_buffer_zero() {
2414        // numToBuffer("0") → [0x80]
2415        assert_eq!(num_to_buffer("0"), vec![0x80]);
2416        assert_eq!(num_to_buffer("-0"), vec![0x80]);
2417    }
2418
2419    #[test]
2420    fn test_hash_prefix_string_keys() {
2421        // Verify known hash prefixes for string keys used in scan tests.
2422        // These specific values determine which segment items land in.
2423        let h1 = compute_hash_prefix(&AttributeValue::S("3635".into()));
2424        let h2 = compute_hash_prefix(&AttributeValue::S("228".into()));
2425        let h3 = compute_hash_prefix(&AttributeValue::S("1668".into()));
2426        let h4 = compute_hash_prefix(&AttributeValue::S("3435".into()));
2427
2428        // With TotalSegments=4096, segment 0 owns bucket 0 only.
2429        // Items "3635" and "228" must be in segment 0 (bucket 0).
2430        assert_eq!(
2431            hash_bucket(&h1),
2432            0,
2433            "3635 should be bucket 0, got hash {h1}"
2434        );
2435        assert_eq!(hash_bucket(&h2), 0, "228 should be bucket 0, got hash {h2}");
2436
2437        // "1668" must be in segment 1 (bucket 1)
2438        assert_eq!(
2439            hash_bucket(&h3),
2440            1,
2441            "1668 should be bucket 1, got hash {h3}"
2442        );
2443
2444        // "3435" must be in segment 4 (bucket 4)
2445        assert_eq!(
2446            hash_bucket(&h4),
2447            4,
2448            "3435 should be bucket 4, got hash {h4}"
2449        );
2450    }
2451
2452    #[test]
2453    fn test_hash_prefix_number_keys() {
2454        // Verify number key hash prefixes from scan tests.
2455        // "251" must be in segment 1 (bucket 1) with TotalSegments=4096
2456        let h1 = compute_hash_prefix(&AttributeValue::N("251".into()));
2457        assert_eq!(hash_bucket(&h1), 1, "251 should be bucket 1, got hash {h1}");
2458
2459        // "2388" must be in segment 4095 (bucket 4095)
2460        let h2 = compute_hash_prefix(&AttributeValue::N("2388".into()));
2461        assert_eq!(
2462            hash_bucket(&h2),
2463            4095,
2464            "2388 should be bucket 4095, got hash {h2}"
2465        );
2466    }
2467
2468    #[test]
2469    fn test_hash_in_segment() {
2470        // bucket 0 should be in segment 0 of 4096
2471        assert!(hash_in_segment("000000", 0, 4096));
2472        assert!(!hash_in_segment("000000", 1, 4096));
2473
2474        // bucket 1 should be in segment 1 of 4096
2475        assert!(hash_in_segment("001000", 1, 4096));
2476        assert!(!hash_in_segment("001000", 0, 4096));
2477
2478        // bucket 4095 should be in segment 4095 of 4096
2479        assert!(hash_in_segment("fff000", 4095, 4096));
2480        assert!(!hash_in_segment("fff000", 0, 4096));
2481
2482        // With 2 segments: buckets 0-2047 in segment 0, 2048-4095 in segment 1
2483        assert!(hash_in_segment("000000", 0, 2));
2484        assert!(hash_in_segment("7ff000", 0, 2));
2485        assert!(hash_in_segment("800000", 1, 2));
2486        assert!(hash_in_segment("fff000", 1, 2));
2487    }
2488}