next_plaid/
filtering.rs

1//! SQLite-based metadata filtering for next-plaid indices.
2//!
3//! This module provides functionality for storing, querying, and managing
4//! document metadata using SQLite, enabling efficient filtering during search.
5//!
6//! The API matches fast-plaid's `filtering.py` for compatibility.
7//!
8//! # Example
9//!
10//! ```ignore
11//! use next-plaid::filtering;
12//! use serde_json::json;
13//!
14//! // Create metadata for documents
15//! let metadata = vec![
16//!     json!({"name": "Alice", "category": "A", "score": 95}),
17//!     json!({"name": "Bob", "category": "B", "score": 87}),
18//! ];
19//!
20//! // Create metadata database
21//! filtering::create("my_index", &metadata)?;
22//!
23//! // Query documents matching a condition
24//! let subset = filtering::where_condition(
25//!     "my_index",
26//!     "category = ? AND score > ?",
27//!     &[json!("A"), json!(90)],
28//! )?;
29//!
30//! // Use subset in search
31//! let results = index.search(&query, &params, Some(&subset))?;
32//! ```
33
34use std::collections::HashMap;
35use std::fs;
36use std::path::Path;
37
38use regex::Regex;
39use rusqlite::{params_from_iter, Connection, Result as SqliteResult, ToSql};
40use serde_json::Value;
41
42use crate::error::{Error, Result};
43
44/// Database file name within the index directory.
45const METADATA_DB_NAME: &str = "metadata.db";
46
47/// Primary key column name (matches fast-plaid).
48const SUBSET_COLUMN: &str = "_subset_";
49
50/// Validate that a column name is a safe SQL identifier.
51///
52/// Column names must start with a letter or underscore, followed by
53/// letters, digits, or underscores. This prevents SQL injection.
54fn is_valid_column_name(name: &str) -> bool {
55    lazy_static_regex().is_match(name)
56}
57
58fn lazy_static_regex() -> &'static Regex {
59    use std::sync::OnceLock;
60    static REGEX: OnceLock<Regex> = OnceLock::new();
61    REGEX.get_or_init(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap())
62}
63
64/// Infer SQL type from a JSON value.
65fn infer_sql_type(value: &Value) -> &'static str {
66    match value {
67        Value::Number(n) => {
68            if n.is_i64() || n.is_u64() {
69                "INTEGER"
70            } else {
71                "REAL"
72            }
73        }
74        Value::Bool(_) => "INTEGER",
75        Value::String(_) => "TEXT",
76        Value::Null => "TEXT",
77        Value::Array(_) | Value::Object(_) => "BLOB",
78    }
79}
80
81/// Convert a JSON value to a type that can be bound to SQLite.
82fn json_to_sql(value: &Value) -> Box<dyn ToSql> {
83    match value {
84        Value::Null => Box::new(None::<String>),
85        Value::Bool(b) => Box::new(if *b { 1i64 } else { 0i64 }),
86        Value::Number(n) => {
87            if let Some(i) = n.as_i64() {
88                Box::new(i)
89            } else if let Some(f) = n.as_f64() {
90                Box::new(f)
91            } else {
92                Box::new(n.to_string())
93            }
94        }
95        Value::String(s) => Box::new(s.clone()),
96        Value::Array(_) | Value::Object(_) => Box::new(serde_json::to_string(value).unwrap()),
97    }
98}
99
100/// Get the path to the metadata database for an index.
101fn get_db_path(index_path: &str) -> std::path::PathBuf {
102    Path::new(index_path).join(METADATA_DB_NAME)
103}
104
105/// Check if a metadata database exists for the given index.
106pub fn exists(index_path: &str) -> bool {
107    get_db_path(index_path).exists()
108}
109
110/// Create a new SQLite metadata database, replacing any existing one.
111///
112/// Each element in `metadata` is a JSON object representing a document's metadata.
113/// The `_subset_` column is automatically added as the primary key.
114///
115/// # Arguments
116///
117/// * `index_path` - Path to the index directory
118/// * `metadata` - Slice of JSON objects, one per document
119///
120/// # Returns
121///
122/// Number of rows inserted
123///
124/// # Errors
125///
126/// Returns an error if:
127/// - The index directory cannot be created
128/// - Column names are invalid (SQL injection prevention)
129/// - Database operations fail
130///
131/// # Example
132///
133/// ```ignore
134/// use next-plaid::filtering;
135/// use serde_json::json;
136///
137/// let metadata = vec![
138///     json!({"name": "Alice", "age": 30}),
139///     json!({"name": "Bob", "age": 25, "city": "NYC"}),
140/// ];
141/// let doc_ids: Vec<i64> = (0..2).collect();
142///
143/// filtering::create("my_index", &metadata, &doc_ids)?;
144/// ```
145pub fn create(index_path: &str, metadata: &[Value], doc_ids: &[i64]) -> Result<usize> {
146    // Validate doc_ids length matches metadata
147    if metadata.len() != doc_ids.len() {
148        return Err(Error::Filtering(format!(
149            "Metadata length ({}) must match doc_ids length ({})",
150            metadata.len(),
151            doc_ids.len()
152        )));
153    }
154
155    // Ensure index directory exists
156    let index_dir = Path::new(index_path);
157    if !index_dir.exists() {
158        fs::create_dir_all(index_dir)?;
159    }
160
161    // Remove existing database
162    let db_path = get_db_path(index_path);
163    if db_path.exists() {
164        fs::remove_file(&db_path)?;
165    }
166
167    if metadata.is_empty() {
168        return Ok(0);
169    }
170
171    // Collect all unique column names and infer types
172    let mut columns: Vec<String> = Vec::new();
173    let mut column_types: HashMap<String, &'static str> = HashMap::new();
174
175    for item in metadata {
176        if let Value::Object(obj) = item {
177            for (key, value) in obj {
178                if !columns.contains(key) {
179                    // Validate column name
180                    if !is_valid_column_name(key) {
181                        return Err(Error::Filtering(format!(
182                            "Invalid column name '{}'. Column names must start with a letter or \
183                             underscore, followed by letters, digits, or underscores.",
184                            key
185                        )));
186                    }
187                    columns.push(key.clone());
188                }
189                // Infer type from first non-null value
190                if !value.is_null() && !column_types.contains_key(key) {
191                    column_types.insert(key.clone(), infer_sql_type(value));
192                }
193            }
194        }
195    }
196
197    // Create connection
198    let conn = Connection::open(&db_path)?;
199
200    // Build CREATE TABLE statement
201    let mut col_defs = vec![format!("\"{}\" INTEGER PRIMARY KEY", SUBSET_COLUMN)];
202    for col in &columns {
203        let sql_type = column_types.get(col).copied().unwrap_or("TEXT");
204        col_defs.push(format!("\"{}\" {}", col, sql_type));
205    }
206
207    let create_sql = format!("CREATE TABLE METADATA ({})", col_defs.join(", "));
208    conn.execute(&create_sql, [])?;
209
210    // Prepare INSERT statement
211    let placeholders: Vec<&str> = std::iter::repeat_n("?", columns.len() + 1).collect();
212    let col_names: Vec<String> = columns.iter().map(|c| format!("\"{}\"", c)).collect();
213    let insert_sql = format!(
214        "INSERT INTO METADATA (\"{}\", {}) VALUES ({})",
215        SUBSET_COLUMN,
216        col_names.join(", "),
217        placeholders.join(", ")
218    );
219
220    // Insert rows
221    let mut stmt = conn.prepare(&insert_sql)?;
222    for (i, item) in metadata.iter().enumerate() {
223        let mut values: Vec<Box<dyn ToSql>> = vec![Box::new(doc_ids[i])];
224        if let Value::Object(obj) = item {
225            for col in &columns {
226                let value = obj.get(col).unwrap_or(&Value::Null);
227                values.push(json_to_sql(value));
228            }
229        } else {
230            // If not an object, insert nulls
231            for _ in &columns {
232                values.push(Box::new(None::<String>));
233            }
234        }
235        let params: Vec<&dyn ToSql> = values.iter().map(|v| v.as_ref()).collect();
236        stmt.execute(params_from_iter(params))?;
237    }
238
239    Ok(metadata.len())
240}
241
242/// Append new metadata rows to an existing database, adding columns if needed.
243///
244/// New columns found in the metadata are automatically added to the table.
245/// The `_subset_` IDs are provided explicitly via `doc_ids` to ensure sync with index.
246///
247/// # Arguments
248///
249/// * `index_path` - Path to the index directory
250/// * `metadata` - Slice of JSON objects for new documents
251/// * `doc_ids` - Document IDs to use as `_subset_` values (must match metadata length)
252///
253/// # Returns
254///
255/// Number of rows inserted
256///
257/// # Errors
258///
259/// Returns an error if:
260/// - The database doesn't exist
261/// - Column names are invalid
262/// - Database operations fail
263/// - metadata length doesn't match doc_ids length
264pub fn update(index_path: &str, metadata: &[Value], doc_ids: &[i64]) -> Result<usize> {
265    if metadata.is_empty() {
266        return Ok(0);
267    }
268
269    // Validate doc_ids length matches metadata
270    if metadata.len() != doc_ids.len() {
271        return Err(Error::Filtering(format!(
272            "Metadata length ({}) must match doc_ids length ({})",
273            metadata.len(),
274            doc_ids.len()
275        )));
276    }
277
278    let db_path = get_db_path(index_path);
279    if !db_path.exists() {
280        return Err(Error::Filtering(
281            "Metadata database does not exist. Use create() first.".into(),
282        ));
283    }
284
285    let conn = Connection::open(&db_path)?;
286
287    // Get existing columns
288    let mut existing_columns: Vec<String> = Vec::new();
289    {
290        let mut stmt = conn.prepare("PRAGMA table_info(METADATA)")?;
291        let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
292        for row in rows {
293            let col = row?;
294            if col != SUBSET_COLUMN {
295                existing_columns.push(col);
296            }
297        }
298    }
299
300    // Find new columns and add them
301    let mut new_columns: Vec<String> = Vec::new();
302    let mut column_types: HashMap<String, &'static str> = HashMap::new();
303
304    for item in metadata {
305        if let Value::Object(obj) = item {
306            for (key, value) in obj {
307                if !existing_columns.contains(key) && !new_columns.contains(key) {
308                    if !is_valid_column_name(key) {
309                        return Err(Error::Filtering(format!(
310                            "Invalid column name '{}'. Column names must start with a letter or \
311                             underscore, followed by letters, digits, or underscores.",
312                            key
313                        )));
314                    }
315                    new_columns.push(key.clone());
316                }
317                if !value.is_null() && !column_types.contains_key(key) {
318                    column_types.insert(key.clone(), infer_sql_type(value));
319                }
320            }
321        }
322    }
323
324    // Add new columns to table
325    for col in &new_columns {
326        let sql_type = column_types.get(col).copied().unwrap_or("TEXT");
327        let alter_sql = format!("ALTER TABLE METADATA ADD COLUMN \"{}\" {}", col, sql_type);
328        conn.execute(&alter_sql, [])?;
329    }
330
331    // Get all columns (existing + new)
332    let all_columns: Vec<String> = existing_columns.into_iter().chain(new_columns).collect();
333
334    // Prepare INSERT statement
335    let placeholders: Vec<&str> = std::iter::repeat_n("?", all_columns.len() + 1).collect();
336    let col_names: Vec<String> = all_columns.iter().map(|c| format!("\"{}\"", c)).collect();
337    let insert_sql = format!(
338        "INSERT INTO METADATA (\"{}\", {}) VALUES ({})",
339        SUBSET_COLUMN,
340        col_names.join(", "),
341        placeholders.join(", ")
342    );
343
344    // Insert rows
345    let mut stmt = conn.prepare(&insert_sql)?;
346    for (i, item) in metadata.iter().enumerate() {
347        let mut values: Vec<Box<dyn ToSql>> = vec![Box::new(doc_ids[i])];
348        if let Value::Object(obj) = item {
349            for col in &all_columns {
350                let value = obj.get(col).unwrap_or(&Value::Null);
351                values.push(json_to_sql(value));
352            }
353        } else {
354            for _ in &all_columns {
355                values.push(Box::new(None::<String>));
356            }
357        }
358        let params: Vec<&dyn ToSql> = values.iter().map(|v| v.as_ref()).collect();
359        stmt.execute(params_from_iter(params))?;
360    }
361
362    Ok(metadata.len())
363}
364
365/// Delete rows by subset IDs and re-index the _subset_ column to be sequential.
366///
367/// After deletion, remaining documents are re-indexed to maintain sequential
368/// `_subset_` IDs starting from 0. This matches fast-plaid behavior.
369///
370/// # Arguments
371///
372/// * `index_path` - Path to the index directory
373/// * `subset` - Slice of document IDs to delete (must be sorted ascending)
374///
375/// # Returns
376///
377/// Number of rows actually deleted
378///
379/// # Errors
380///
381/// Returns an error if the database operations fail.
382pub fn delete(index_path: &str, subset: &[i64]) -> Result<usize> {
383    if subset.is_empty() {
384        return Ok(0);
385    }
386
387    let db_path = get_db_path(index_path);
388    if !db_path.exists() {
389        return Ok(0);
390    }
391
392    let conn = Connection::open(&db_path)?;
393
394    // Start transaction
395    conn.execute("BEGIN", [])?;
396
397    // Delete specified rows
398    let placeholders: Vec<String> = subset.iter().map(|_| "?".to_string()).collect();
399    let delete_sql = format!(
400        "DELETE FROM METADATA WHERE \"{}\" IN ({})",
401        SUBSET_COLUMN,
402        placeholders.join(", ")
403    );
404    let subset_refs: Vec<&dyn ToSql> = subset.iter().map(|v| v as &dyn ToSql).collect();
405    let deleted = conn.execute(&delete_sql, params_from_iter(subset_refs))?;
406
407    // Get column names (excluding _subset_)
408    let mut columns: Vec<String> = Vec::new();
409    {
410        let mut stmt = conn.prepare("PRAGMA table_info(METADATA)")?;
411        let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
412        for row in rows {
413            let col = row?;
414            if col != SUBSET_COLUMN {
415                columns.push(col);
416            }
417        }
418    }
419
420    let col_str = columns
421        .iter()
422        .map(|c| format!("\"{}\"", c))
423        .collect::<Vec<_>>()
424        .join(", ");
425
426    // Create temp table with re-indexed _subset_ values
427    let create_temp_sql = format!(
428        "CREATE TEMP TABLE METADATA_TEMP AS \
429         SELECT (ROW_NUMBER() OVER (ORDER BY \"{}\")) - 1 AS new_subset_id, {} \
430         FROM METADATA",
431        SUBSET_COLUMN, col_str
432    );
433    conn.execute(&create_temp_sql, [])?;
434
435    // Clear original table
436    conn.execute("DELETE FROM METADATA", [])?;
437
438    // Copy back with new IDs
439    let insert_back_sql = format!(
440        "INSERT INTO METADATA (\"{}\", {}) \
441         SELECT new_subset_id, {} FROM METADATA_TEMP",
442        SUBSET_COLUMN, col_str, col_str
443    );
444    conn.execute(&insert_back_sql, [])?;
445
446    // Drop temp table
447    conn.execute("DROP TABLE METADATA_TEMP", [])?;
448
449    // Commit transaction
450    conn.execute("COMMIT", [])?;
451
452    Ok(deleted)
453}
454
455/// Query the database and return matching _subset_ IDs.
456///
457/// # Arguments
458///
459/// * `index_path` - Path to the index directory
460/// * `condition` - SQL WHERE clause with `?` placeholders (e.g., "category = ? AND score > ?")
461/// * `parameters` - Values to substitute for placeholders
462///
463/// # Returns
464///
465/// Vector of `_subset_` IDs matching the condition
466///
467/// # Example
468///
469/// ```ignore
470/// use next-plaid::filtering;
471/// use serde_json::json;
472///
473/// let subset = filtering::where_condition(
474///     "my_index",
475///     "category = ? AND score > ?",
476///     &[json!("A"), json!(90)],
477/// )?;
478/// ```
479pub fn where_condition(
480    index_path: &str,
481    condition: &str,
482    parameters: &[Value],
483) -> Result<Vec<i64>> {
484    let db_path = get_db_path(index_path);
485    if !db_path.exists() {
486        return Err(Error::Filtering(
487            "No metadata database found. Create it first by adding metadata during index creation."
488                .into(),
489        ));
490    }
491
492    let conn = Connection::open(&db_path)?;
493
494    let query = format!(
495        "SELECT \"{}\" FROM METADATA WHERE {}",
496        SUBSET_COLUMN, condition
497    );
498
499    let params: Vec<Box<dyn ToSql>> = parameters.iter().map(json_to_sql).collect();
500    let param_refs: Vec<&dyn ToSql> = params.iter().map(|v| v.as_ref()).collect();
501
502    let mut stmt = conn.prepare(&query)?;
503    let rows = stmt.query_map(params_from_iter(param_refs), |row| row.get::<_, i64>(0))?;
504
505    let mut result = Vec::new();
506    for row in rows {
507        result.push(row?);
508    }
509
510    Ok(result)
511}
512
513/// Get full metadata rows by condition or subset IDs.
514///
515/// Returns metadata as JSON objects with the `_subset_` field included.
516///
517/// # Arguments
518///
519/// * `index_path` - Path to the index directory
520/// * `condition` - Optional SQL WHERE clause (mutually exclusive with `subset`)
521/// * `parameters` - Values for condition placeholders
522/// * `subset` - Optional list of `_subset_` IDs to retrieve (mutually exclusive with `condition`)
523///
524/// # Returns
525///
526/// Vector of JSON objects representing metadata rows
527///
528/// # Ordering
529///
530/// - If `subset` is provided: Returns rows in the order specified by `subset`
531/// - If `condition` is provided: Returns rows ordered by `_subset_` ascending
532pub fn get(
533    index_path: &str,
534    condition: Option<&str>,
535    parameters: &[Value],
536    subset: Option<&[i64]>,
537) -> Result<Vec<Value>> {
538    if condition.is_some() && subset.is_some() {
539        return Err(Error::Filtering(
540            "Please provide either a 'condition' or a 'subset', not both.".into(),
541        ));
542    }
543
544    let db_path = get_db_path(index_path);
545    if !db_path.exists() {
546        return Ok(Vec::new());
547    }
548
549    let conn = Connection::open(&db_path)?;
550
551    // Get column names
552    let mut columns: Vec<String> = Vec::new();
553    {
554        let mut stmt = conn.prepare("PRAGMA table_info(METADATA)")?;
555        let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
556        for row in rows {
557            columns.push(row?);
558        }
559    }
560
561    // Build query
562    let (query, params): (String, Vec<Box<dyn ToSql>>) = if let Some(cond) = condition {
563        let query = format!(
564            "SELECT * FROM METADATA WHERE {} ORDER BY \"{}\"",
565            cond, SUBSET_COLUMN
566        );
567        let params = parameters.iter().map(json_to_sql).collect();
568        (query, params)
569    } else if let Some(ids) = subset {
570        if ids.is_empty() {
571            return Ok(Vec::new());
572        }
573        let placeholders: Vec<String> = ids.iter().map(|_| "?".to_string()).collect();
574        let query = format!(
575            "SELECT * FROM METADATA WHERE \"{}\" IN ({})",
576            SUBSET_COLUMN,
577            placeholders.join(", ")
578        );
579        let params: Vec<Box<dyn ToSql>> = ids
580            .iter()
581            .map(|&id| Box::new(id) as Box<dyn ToSql>)
582            .collect();
583        (query, params)
584    } else {
585        let query = format!("SELECT * FROM METADATA ORDER BY \"{}\"", SUBSET_COLUMN);
586        (query, Vec::new())
587    };
588
589    let param_refs: Vec<&dyn ToSql> = params.iter().map(|v| v.as_ref()).collect();
590    let mut stmt = conn.prepare(&query)?;
591    let mut rows = stmt.query(params_from_iter(param_refs))?;
592
593    let mut results: Vec<Value> = Vec::new();
594    while let Some(row) = rows.next()? {
595        let mut obj = serde_json::Map::new();
596        for (i, col) in columns.iter().enumerate() {
597            let value = row_to_json_value(row, i)?;
598            obj.insert(col.clone(), value);
599        }
600        results.push(Value::Object(obj));
601    }
602
603    // If subset was provided, reorder results to match subset order
604    if let Some(ids) = subset {
605        let mut results_map: HashMap<i64, Value> = HashMap::new();
606        for result in results {
607            if let Some(id) = result.get(SUBSET_COLUMN).and_then(|v| v.as_i64()) {
608                results_map.insert(id, result);
609            }
610        }
611        results = ids.iter().filter_map(|id| results_map.remove(id)).collect();
612    }
613
614    Ok(results)
615}
616
617/// Helper to convert a rusqlite row column to JSON value.
618fn row_to_json_value(row: &rusqlite::Row, idx: usize) -> SqliteResult<Value> {
619    // Try to get the value in order of most likely types
620    if let Ok(i) = row.get::<_, i64>(idx) {
621        return Ok(Value::Number(i.into()));
622    }
623    if let Ok(f) = row.get::<_, f64>(idx) {
624        return Ok(serde_json::Number::from_f64(f)
625            .map(Value::Number)
626            .unwrap_or(Value::Null));
627    }
628    if let Ok(s) = row.get::<_, String>(idx) {
629        return Ok(Value::String(s));
630    }
631    if let Ok(b) = row.get::<_, Vec<u8>>(idx) {
632        // Try to parse as JSON first
633        if let Ok(v) = serde_json::from_slice(&b) {
634            return Ok(v);
635        }
636        // Otherwise return as base64 string
637        return Ok(Value::String(base64_encode(&b)));
638    }
639    Ok(Value::Null)
640}
641
642fn base64_encode(data: &[u8]) -> String {
643    let mut result = String::with_capacity(data.len() * 4 / 3 + 4);
644    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
645
646    for chunk in data.chunks(3) {
647        let b0 = chunk[0] as usize;
648        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
649        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
650
651        result.push(ALPHABET[b0 >> 2] as char);
652        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
653
654        if chunk.len() > 1 {
655            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
656        } else {
657            result.push('=');
658        }
659
660        if chunk.len() > 2 {
661            result.push(ALPHABET[b2 & 0x3f] as char);
662        } else {
663            result.push('=');
664        }
665    }
666
667    result
668}
669
670/// Get the number of documents in the metadata database.
671pub fn count(index_path: &str) -> Result<usize> {
672    let db_path = get_db_path(index_path);
673    if !db_path.exists() {
674        return Ok(0);
675    }
676
677    let conn = Connection::open(&db_path)?;
678    let count: i64 = conn.query_row("SELECT COUNT(*) FROM METADATA", [], |row| row.get(0))?;
679    Ok(count as usize)
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685    use serde_json::json;
686    use tempfile::TempDir;
687
688    fn setup_test_dir() -> TempDir {
689        TempDir::new().unwrap()
690    }
691
692    #[test]
693    fn test_create_empty() {
694        let dir = setup_test_dir();
695        let path = dir.path().to_str().unwrap();
696
697        let result = create(path, &[], &[]).unwrap();
698        assert_eq!(result, 0);
699        assert!(!exists(path));
700    }
701
702    #[test]
703    fn test_create_with_metadata() {
704        let dir = setup_test_dir();
705        let path = dir.path().to_str().unwrap();
706
707        let metadata = vec![
708            json!({"name": "Alice", "age": 30, "score": 95.5}),
709            json!({"name": "Bob", "age": 25, "score": 87.0}),
710            json!({"name": "Charlie", "age": 35}),
711        ];
712        let doc_ids: Vec<i64> = (0..3).collect();
713
714        let result = create(path, &metadata, &doc_ids).unwrap();
715        assert_eq!(result, 3);
716        assert!(exists(path));
717
718        // Verify count
719        assert_eq!(count(path).unwrap(), 3);
720    }
721
722    #[test]
723    fn test_create_invalid_column_name() {
724        let dir = setup_test_dir();
725        let path = dir.path().to_str().unwrap();
726
727        let metadata = vec![json!({"valid_name": "Alice", "invalid name": 30})];
728        let doc_ids = vec![0];
729
730        let result = create(path, &metadata, &doc_ids);
731        assert!(result.is_err());
732    }
733
734    #[test]
735    fn test_where_condition() {
736        let dir = setup_test_dir();
737        let path = dir.path().to_str().unwrap();
738
739        let metadata = vec![
740            json!({"name": "Alice", "category": "A", "score": 95}),
741            json!({"name": "Bob", "category": "B", "score": 87}),
742            json!({"name": "Charlie", "category": "A", "score": 92}),
743        ];
744        let doc_ids: Vec<i64> = (0..3).collect();
745
746        create(path, &metadata, &doc_ids).unwrap();
747
748        // Query by category
749        let subset = where_condition(path, "category = ?", &[json!("A")]).unwrap();
750        assert_eq!(subset, vec![0, 2]);
751
752        // Query with multiple conditions
753        let subset =
754            where_condition(path, "category = ? AND score > ?", &[json!("A"), json!(93)]).unwrap();
755        assert_eq!(subset, vec![0]);
756    }
757
758    #[test]
759    fn test_get_all() {
760        let dir = setup_test_dir();
761        let path = dir.path().to_str().unwrap();
762
763        let metadata = vec![
764            json!({"name": "Alice", "age": 30}),
765            json!({"name": "Bob", "age": 25}),
766        ];
767        let doc_ids: Vec<i64> = (0..2).collect();
768
769        create(path, &metadata, &doc_ids).unwrap();
770
771        let results = get(path, None, &[], None).unwrap();
772        assert_eq!(results.len(), 2);
773        assert_eq!(results[0]["name"], "Alice");
774        assert_eq!(results[1]["name"], "Bob");
775    }
776
777    #[test]
778    fn test_get_by_subset() {
779        let dir = setup_test_dir();
780        let path = dir.path().to_str().unwrap();
781
782        let metadata = vec![
783            json!({"name": "Alice"}),
784            json!({"name": "Bob"}),
785            json!({"name": "Charlie"}),
786        ];
787        let doc_ids: Vec<i64> = (0..3).collect();
788
789        create(path, &metadata, &doc_ids).unwrap();
790
791        // Get specific subset in order
792        let results = get(path, None, &[], Some(&[2, 0])).unwrap();
793        assert_eq!(results.len(), 2);
794        assert_eq!(results[0]["name"], "Charlie");
795        assert_eq!(results[1]["name"], "Alice");
796    }
797
798    #[test]
799    fn test_update_adds_rows() {
800        let dir = setup_test_dir();
801        let path = dir.path().to_str().unwrap();
802
803        let metadata1 = vec![json!({"name": "Alice"}), json!({"name": "Bob"})];
804        let doc_ids1: Vec<i64> = (0..2).collect();
805
806        create(path, &metadata1, &doc_ids1).unwrap();
807        assert_eq!(count(path).unwrap(), 2);
808
809        let metadata2 = vec![json!({"name": "Charlie"})];
810        let doc_ids2 = vec![2]; // Next ID after the first batch
811
812        update(path, &metadata2, &doc_ids2).unwrap();
813        assert_eq!(count(path).unwrap(), 3);
814
815        // Verify the new row has correct _subset_ ID
816        let results = get(path, None, &[], None).unwrap();
817        assert_eq!(results[2]["_subset_"], 2);
818        assert_eq!(results[2]["name"], "Charlie");
819    }
820
821    #[test]
822    fn test_update_adds_columns() {
823        let dir = setup_test_dir();
824        let path = dir.path().to_str().unwrap();
825
826        let metadata1 = vec![json!({"name": "Alice"})];
827        let doc_ids1 = vec![0];
828
829        create(path, &metadata1, &doc_ids1).unwrap();
830
831        let metadata2 = vec![json!({"name": "Bob", "age": 25, "city": "NYC"})];
832        let doc_ids2 = vec![1];
833
834        update(path, &metadata2, &doc_ids2).unwrap();
835
836        // Verify new columns exist
837        let results = get(path, None, &[], None).unwrap();
838        assert_eq!(results[0]["name"], "Alice");
839        assert!(results[0]["age"].is_null()); // Old row has null for new column
840        assert_eq!(results[1]["age"], 25);
841        assert_eq!(results[1]["city"], "NYC");
842    }
843
844    #[test]
845    fn test_delete_and_reindex() {
846        let dir = setup_test_dir();
847        let path = dir.path().to_str().unwrap();
848
849        let metadata = vec![
850            json!({"name": "Alice"}),
851            json!({"name": "Bob"}),
852            json!({"name": "Charlie"}),
853            json!({"name": "Diana"}),
854        ];
855        let doc_ids: Vec<i64> = (0..4).collect();
856
857        create(path, &metadata, &doc_ids).unwrap();
858
859        // Delete Bob (1) and Charlie (2)
860        let deleted = delete(path, &[1, 2]).unwrap();
861        assert_eq!(deleted, 2);
862        assert_eq!(count(path).unwrap(), 2);
863
864        // Verify remaining rows have re-indexed _subset_ IDs
865        let results = get(path, None, &[], None).unwrap();
866        assert_eq!(results.len(), 2);
867        assert_eq!(results[0]["_subset_"], 0);
868        assert_eq!(results[0]["name"], "Alice");
869        assert_eq!(results[1]["_subset_"], 1);
870        assert_eq!(results[1]["name"], "Diana");
871    }
872
873    #[test]
874    fn test_where_with_like() {
875        let dir = setup_test_dir();
876        let path = dir.path().to_str().unwrap();
877
878        let metadata = vec![
879            json!({"name": "Alice"}),
880            json!({"name": "Alex"}),
881            json!({"name": "Bob"}),
882        ];
883        let doc_ids: Vec<i64> = (0..3).collect();
884
885        create(path, &metadata, &doc_ids).unwrap();
886
887        let subset = where_condition(path, "name LIKE ?", &[json!("Al%")]).unwrap();
888        assert_eq!(subset, vec![0, 1]);
889    }
890
891    #[test]
892    fn test_is_valid_column_name() {
893        assert!(is_valid_column_name("name"));
894        assert!(is_valid_column_name("_private"));
895        assert!(is_valid_column_name("column123"));
896        assert!(is_valid_column_name("Col_Name_2"));
897
898        assert!(!is_valid_column_name("123column")); // starts with number
899        assert!(!is_valid_column_name("column name")); // space
900        assert!(!is_valid_column_name("column-name")); // hyphen
901        assert!(!is_valid_column_name("")); // empty
902        assert!(!is_valid_column_name("col;drop")); // SQL injection attempt
903    }
904
905    #[test]
906    fn test_type_inference() {
907        let dir = setup_test_dir();
908        let path = dir.path().to_str().unwrap();
909
910        let metadata = vec![json!({
911            "int_val": 42,
912            "float_val": 3.125,
913            "str_val": "hello",
914            "bool_val": true,
915            "null_val": null
916        })];
917        let doc_ids = vec![0];
918
919        create(path, &metadata, &doc_ids).unwrap();
920
921        let results = get(path, None, &[], None).unwrap();
922        assert_eq!(results[0]["int_val"], 42);
923        assert!((results[0]["float_val"].as_f64().unwrap() - 3.125).abs() < 0.001);
924        assert_eq!(results[0]["str_val"], "hello");
925        assert_eq!(results[0]["bool_val"], 1); // Bool stored as INTEGER
926        assert!(results[0]["null_val"].is_null());
927    }
928}