database_replicator/sqlite/
converter.rs

1// ABOUTME: SQLite to JSONB type conversion for PostgreSQL storage
2// ABOUTME: Handles all SQLite types with lossless conversion and BLOB base64 encoding
3
4use anyhow::{Context, Result};
5use rusqlite::Connection;
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8
9/// Convert a single SQLite value to JSON
10///
11/// Maps SQLite types to JSON types:
12/// - INTEGER → number (i64)
13/// - REAL → number (f64)
14/// - TEXT → string (UTF-8)
15/// - BLOB → object with base64-encoded data
16/// - NULL → null
17///
18/// # Arguments
19///
20/// * `value` - SQLite value from rusqlite
21///
22/// # Returns
23///
24/// JSON value suitable for JSONB storage
25///
26/// # Examples
27///
28/// ```no_run
29/// # use database_replicator::sqlite::converter::sqlite_value_to_json;
30/// # use rusqlite::types::Value;
31/// let sqlite_int = Value::Integer(42);
32/// let json = sqlite_value_to_json(&sqlite_int).unwrap();
33/// assert_eq!(json, serde_json::json!(42));
34/// ```
35pub fn sqlite_value_to_json(value: &rusqlite::types::Value) -> Result<JsonValue> {
36    match value {
37        rusqlite::types::Value::Null => Ok(JsonValue::Null),
38
39        rusqlite::types::Value::Integer(i) => Ok(JsonValue::Number((*i).into())),
40
41        rusqlite::types::Value::Real(f) => {
42            // Convert f64 to JSON number
43            // Note: JSON can't represent NaN or Infinity, handle edge cases
44            if f.is_finite() {
45                serde_json::Number::from_f64(*f)
46                    .map(JsonValue::Number)
47                    .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48            } else {
49                // Store non-finite numbers as strings for safety
50                Ok(JsonValue::String(f.to_string()))
51            }
52        }
53
54        rusqlite::types::Value::Text(s) => Ok(JsonValue::String(s.clone())),
55
56        rusqlite::types::Value::Blob(b) => {
57            // Encode BLOB as base64 in a JSON object
58            // Format: {"_type": "blob", "data": "base64..."}
59            // This allows distinguishing BLOBs from regular strings
60            let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
61            Ok(serde_json::json!({
62                "_type": "blob",
63                "data": encoded
64            }))
65        }
66    }
67}
68
69/// Convert a SQLite row (HashMap) to JSON object
70///
71/// Converts all column values to JSON and returns a JSON object
72/// with column names as keys.
73///
74/// # Arguments
75///
76/// * `row` - HashMap of column_name → SQLite value
77///
78/// # Returns
79///
80/// JSON object ready for JSONB storage
81///
82/// # Examples
83///
84/// ```no_run
85/// # use database_replicator::sqlite::converter::sqlite_row_to_json;
86/// # use std::collections::HashMap;
87/// # use rusqlite::types::Value;
88/// let mut row = HashMap::new();
89/// row.insert("id".to_string(), Value::Integer(1));
90/// row.insert("name".to_string(), Value::Text("Alice".to_string()));
91/// let json = sqlite_row_to_json(row).unwrap();
92/// assert_eq!(json["id"], 1);
93/// assert_eq!(json["name"], "Alice");
94/// ```
95pub fn sqlite_row_to_json(row: HashMap<String, rusqlite::types::Value>) -> Result<JsonValue> {
96    let mut json_obj = serde_json::Map::new();
97
98    for (col_name, value) in row {
99        let json_value = sqlite_value_to_json(&value)
100            .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
101        json_obj.insert(col_name, json_value);
102    }
103
104    Ok(JsonValue::Object(json_obj))
105}
106
107/// Convert an entire SQLite table to JSONB format
108///
109/// Reads all rows from a SQLite table and converts them to JSONB.
110/// Returns a vector of (id, json_data) tuples ready for insertion.
111///
112/// # ID Generation Strategy
113///
114/// - If table has a column named "id", "rowid", or "_id", use that as the ID
115/// - Otherwise, use SQLite's rowid (every table has one)
116/// - IDs are converted to strings for consistency
117///
118/// # Arguments
119///
120/// * `conn` - SQLite database connection
121/// * `table` - Table name (must be validated)
122///
123/// # Returns
124///
125/// Vector of (id_string, json_data) tuples for batch insert
126///
127/// # Security
128///
129/// Table name should be validated before calling this function.
130///
131/// # Examples
132///
133/// ```no_run
134/// # use database_replicator::sqlite::{open_sqlite, converter::convert_table_to_jsonb};
135/// # use database_replicator::jsonb::validate_table_name;
136/// # fn example() -> anyhow::Result<()> {
137/// let conn = open_sqlite("database.db")?;
138/// let table = "users";
139/// validate_table_name(table)?;
140/// let rows = convert_table_to_jsonb(&conn, table)?;
141/// println!("Converted {} rows to JSONB", rows.len());
142/// # Ok(())
143/// # }
144/// ```
145pub fn convert_table_to_jsonb(conn: &Connection, table: &str) -> Result<Vec<(String, JsonValue)>> {
146    // Validate table name
147    crate::jsonb::validate_table_name(table).context("Invalid table name for JSONB conversion")?;
148
149    tracing::info!("Converting SQLite table '{}' to JSONB", table);
150
151    // Read all rows using our reader
152    let rows = crate::sqlite::reader::read_table_data(conn, table)
153        .with_context(|| format!("Failed to read data from table '{}'", table))?;
154
155    // Detect ID column
156    let id_column = detect_id_column(conn, table)?;
157
158    let mut result = Vec::with_capacity(rows.len());
159
160    for (row_num, row) in rows.into_iter().enumerate() {
161        // Extract or generate ID
162        let id = if let Some(ref id_col) = id_column {
163            // Use the specified ID column
164            match row.get(id_col) {
165                Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
166                Some(rusqlite::types::Value::Text(s)) => s.clone(),
167                Some(rusqlite::types::Value::Real(f)) => f.to_string(),
168                _ => {
169                    // Fallback to row number if ID is NULL or unsupported type
170                    tracing::warn!(
171                        "Row {} in table '{}' has invalid ID type, using row number",
172                        row_num + 1,
173                        table
174                    );
175                    (row_num + 1).to_string()
176                }
177            }
178        } else {
179            // No ID column found, use row number
180            // SQLite rowid is 1-indexed, so we add 1
181            (row_num + 1).to_string()
182        };
183
184        // Convert row to JSON
185        let json_data = sqlite_row_to_json(row).with_context(|| {
186            format!(
187                "Failed to convert row {} in table '{}' to JSON",
188                row_num + 1,
189                table
190            )
191        })?;
192
193        result.push((id, json_data));
194    }
195
196    tracing::info!(
197        "Converted {} rows from table '{}' to JSONB",
198        result.len(),
199        table
200    );
201
202    Ok(result)
203}
204
205/// Detect the ID column for a table
206///
207/// Checks for common ID column names: "id", "rowid", "_id" (case-insensitive).
208/// If found, returns the column name. Otherwise returns None.
209fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
210    crate::jsonb::validate_table_name(table).context("Invalid SQLite table name")?;
211
212    // Get column metadata so we can detect declared primary keys
213    let query = format!("PRAGMA table_info({})", crate::utils::quote_ident(table));
214    let mut stmt = conn
215        .prepare(&query)
216        .with_context(|| format!("Failed to get table info for '{}'", table))?;
217
218    let mut columns: Vec<String> = Vec::new();
219    let mut pk_columns: Vec<(i64, String)> = Vec::new();
220
221    let rows = stmt
222        .query_map([], |row| {
223            let name: String = row.get(1)?;
224            let pk_position: i64 = row.get(5)?;
225            Ok((name, pk_position))
226        })
227        .context("Failed to query table columns")?;
228
229    for row in rows {
230        let (name, pk_position) = row.context("Failed to parse table_info row")?;
231        if pk_position > 0 {
232            pk_columns.push((pk_position, name.clone()));
233        }
234        columns.push(name);
235    }
236
237    pk_columns.sort_by_key(|(pos, _)| *pos);
238    if pk_columns.len() == 1 {
239        let pk = pk_columns.remove(0).1;
240        tracing::debug!(
241            "Using primary key column '{}' as ID for table '{}'",
242            pk,
243            table
244        );
245        return Ok(Some(pk));
246    } else if pk_columns.len() > 1 {
247        tracing::info!(
248            "Table '{}' has a composite primary key; falling back to row numbers",
249            table
250        );
251        return Ok(None);
252    }
253
254    // No declared primary key – fall back to heuristic columns, but only if unique
255    let id_candidates = ["id", "rowid", "_id"];
256    for candidate in &id_candidates {
257        if let Some(col) = columns.iter().find(|c| c.to_lowercase() == *candidate) {
258            if column_is_unique(conn, table, col)? {
259                tracing::debug!("Using unique column '{}' as ID for table '{}'", col, table);
260                return Ok(Some(col.clone()));
261            } else {
262                tracing::warn!(
263                    "Column '{}' on table '{}' contains duplicate values; using row numbers instead",
264                    col,
265                    table
266                );
267            }
268        }
269    }
270
271    tracing::debug!(
272        "No unique ID column found for table '{}', will use row number",
273        table
274    );
275    Ok(None)
276}
277
278fn column_is_unique(conn: &Connection, table: &str, column: &str) -> Result<bool> {
279    crate::jsonb::validate_table_name(column).context("Invalid column name")?;
280
281    let query = format!(
282        "SELECT COUNT(*) as total_rows, COUNT(DISTINCT {}) as distinct_rows FROM {}",
283        crate::utils::quote_ident(column),
284        crate::utils::quote_ident(table)
285    );
286
287    let (total_rows, distinct_rows): (i64, i64) = conn
288        .query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?)))
289        .with_context(|| {
290            format!(
291                "Failed to evaluate uniqueness of column '{}' on table '{}'",
292                column, table
293            )
294        })?;
295
296    Ok(total_rows == distinct_rows)
297}
298
299/// Convert a batch of SQLite rows to JSONB format.
300///
301/// Converts a pre-read batch of rows, extracting IDs and converting to JSON.
302fn convert_batch_to_jsonb(
303    rows: Vec<HashMap<String, rusqlite::types::Value>>,
304    id_column: &Option<String>,
305    start_row_num: usize,
306    table: &str,
307) -> Result<Vec<(String, JsonValue)>> {
308    let mut result = Vec::with_capacity(rows.len());
309
310    for (batch_idx, mut row) in rows.into_iter().enumerate() {
311        let row_num = start_row_num + batch_idx;
312
313        // Remove internal _rowid tracking column before conversion
314        row.remove("_rowid");
315
316        // Extract or generate ID
317        let id = if let Some(ref id_col) = id_column {
318            match row.get(id_col) {
319                Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
320                Some(rusqlite::types::Value::Text(s)) => s.clone(),
321                Some(rusqlite::types::Value::Real(f)) => f.to_string(),
322                _ => (row_num + 1).to_string(),
323            }
324        } else {
325            (row_num + 1).to_string()
326        };
327
328        // Convert row to JSON
329        let json_data = sqlite_row_to_json(row).with_context(|| {
330            format!(
331                "Failed to convert row {} in table '{}' to JSON",
332                row_num + 1,
333                table
334            )
335        })?;
336
337        result.push((id, json_data));
338    }
339
340    Ok(result)
341}
342
343/// Convert and insert a SQLite table to PostgreSQL using batched processing.
344///
345/// This function uses memory-efficient batched processing to handle large tables:
346/// 1. Reads rows in batches (default 10,000 rows)
347/// 2. Converts each batch to JSONB format
348/// 3. Inserts each batch to PostgreSQL before reading the next
349///
350/// Memory usage stays constant regardless of table size.
351///
352/// # Arguments
353///
354/// * `sqlite_conn` - SQLite database connection
355/// * `pg_client` - PostgreSQL client connection
356/// * `table` - Table name to convert
357/// * `source_type` - Source type label for metadata (e.g., "sqlite")
358/// * `batch_size` - Optional batch size (default: 10,000 rows)
359///
360/// # Returns
361///
362/// Total number of rows processed.
363///
364/// # Examples
365///
366/// ```no_run
367/// # use database_replicator::sqlite::converter::convert_table_batched;
368/// # async fn example(
369/// #     sqlite_conn: &rusqlite::Connection,
370/// #     pg_client: &tokio_postgres::Client,
371/// # ) -> anyhow::Result<()> {
372/// let rows_processed = convert_table_batched(
373///     sqlite_conn,
374///     pg_client,
375///     "large_table",
376///     "sqlite",
377///     None,
378/// ).await?;
379/// println!("Processed {} rows", rows_processed);
380/// # Ok(())
381/// # }
382/// ```
383pub async fn convert_table_batched(
384    sqlite_conn: &Connection,
385    pg_client: &tokio_postgres::Client,
386    table: &str,
387    source_type: &str,
388    batch_size: Option<usize>,
389) -> Result<usize> {
390    use crate::sqlite::reader::{read_table_batch, BatchedTableReader};
391
392    // Use memory-based batch size calculation if not specified
393    let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);
394
395    tracing::info!(
396        "Starting batched conversion of table '{}' (batch_size={})",
397        table,
398        batch_size
399    );
400
401    // Detect ID column once before processing batches
402    let id_column = detect_id_column(sqlite_conn, table)?;
403
404    // Create batched reader
405    let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;
406
407    let mut total_rows = 0usize;
408    let mut batch_num = 0usize;
409
410    // Process batches until exhausted
411    while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
412        let batch_row_count = rows.len();
413        batch_num += 1;
414
415        tracing::debug!(
416            "Processing batch {} ({} rows) from table '{}'",
417            batch_num,
418            batch_row_count,
419            table
420        );
421
422        // Convert batch to JSONB
423        let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;
424
425        // Insert batch to PostgreSQL
426        if !jsonb_rows.is_empty() {
427            crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
428                .await
429                .with_context(|| {
430                    format!(
431                        "Failed to insert batch {} into PostgreSQL table '{}'",
432                        batch_num, table
433                    )
434                })?;
435        }
436
437        total_rows += batch_row_count;
438
439        // Log progress for large tables
440        if total_rows.is_multiple_of(100_000) {
441            tracing::info!(
442                "Progress: {} rows processed from table '{}'",
443                total_rows,
444                table
445            );
446        }
447    }
448
449    tracing::info!(
450        "Completed batched conversion of table '{}': {} total rows in {} batches",
451        table,
452        total_rows,
453        batch_num
454    );
455
456    Ok(total_rows)
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use rusqlite::types::Value;
463
464    #[test]
465    fn test_convert_integer() {
466        let value = Value::Integer(42);
467        let json = sqlite_value_to_json(&value).unwrap();
468        assert_eq!(json, serde_json::json!(42));
469    }
470
471    #[test]
472    fn test_convert_real() {
473        let value = Value::Real(42.75);
474        let json = sqlite_value_to_json(&value).unwrap();
475        assert_eq!(json, serde_json::json!(42.75));
476    }
477
478    #[test]
479    fn test_convert_text() {
480        let value = Value::Text("Hello, World!".to_string());
481        let json = sqlite_value_to_json(&value).unwrap();
482        assert_eq!(json, serde_json::json!("Hello, World!"));
483    }
484
485    #[test]
486    fn test_convert_null() {
487        let value = Value::Null;
488        let json = sqlite_value_to_json(&value).unwrap();
489        assert_eq!(json, JsonValue::Null);
490    }
491
492    #[test]
493    fn test_convert_blob() {
494        let blob_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello" in bytes
495        let value = Value::Blob(blob_data.clone());
496        let json = sqlite_value_to_json(&value).unwrap();
497
498        // Should be wrapped in an object with _type and data fields
499        assert!(json.is_object());
500        assert_eq!(json["_type"], "blob");
501
502        // Decode and verify
503        let encoded = json["data"].as_str().unwrap();
504        let decoded =
505            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encoded).unwrap();
506        assert_eq!(decoded, blob_data);
507    }
508
509    #[test]
510    fn test_convert_non_finite_float() {
511        let nan_value = Value::Real(f64::NAN);
512        let json = sqlite_value_to_json(&nan_value).unwrap();
513        // NaN should be converted to string
514        assert!(json.is_string());
515
516        let inf_value = Value::Real(f64::INFINITY);
517        let json = sqlite_value_to_json(&inf_value).unwrap();
518        // Infinity should be converted to string
519        assert!(json.is_string());
520    }
521
522    #[test]
523    fn test_sqlite_row_to_json() {
524        let mut row = HashMap::new();
525        row.insert("id".to_string(), Value::Integer(1));
526        row.insert("name".to_string(), Value::Text("Alice".to_string()));
527        row.insert("age".to_string(), Value::Integer(30));
528        row.insert("balance".to_string(), Value::Real(100.50));
529        row.insert("notes".to_string(), Value::Null);
530
531        let json = sqlite_row_to_json(row).unwrap();
532
533        assert_eq!(json["id"], 1);
534        assert_eq!(json["name"], "Alice");
535        assert_eq!(json["age"], 30);
536        assert_eq!(json["balance"], 100.50);
537        assert_eq!(json["notes"], JsonValue::Null);
538    }
539
540    #[test]
541    fn test_convert_table_to_jsonb() {
542        // Create a test database
543        let conn = Connection::open_in_memory().unwrap();
544
545        // Create test table with ID column
546        conn.execute(
547            "CREATE TABLE users (
548                id INTEGER PRIMARY KEY,
549                name TEXT NOT NULL,
550                email TEXT,
551                age INTEGER
552            )",
553            [],
554        )
555        .unwrap();
556
557        // Insert test data
558        conn.execute(
559            "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
560            [],
561        )
562        .unwrap();
563        conn.execute(
564            "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
565            [],
566        )
567        .unwrap();
568
569        // Convert to JSONB
570        let result = convert_table_to_jsonb(&conn, "users").unwrap();
571
572        assert_eq!(result.len(), 2);
573
574        // Check first row
575        let (id1, json1) = &result[0];
576        assert_eq!(id1, "1");
577        assert_eq!(json1["name"], "Alice");
578        assert_eq!(json1["email"], "alice@example.com");
579        assert_eq!(json1["age"], 30);
580
581        // Check second row
582        let (id2, json2) = &result[1];
583        assert_eq!(id2, "2");
584        assert_eq!(json2["name"], "Bob");
585    }
586
587    #[test]
588    fn test_convert_table_without_id_column() {
589        // Create a test database
590        let conn = Connection::open_in_memory().unwrap();
591
592        // Create table WITHOUT explicit ID column
593        conn.execute(
594            "CREATE TABLE logs (
595                timestamp INTEGER,
596                message TEXT
597            )",
598            [],
599        )
600        .unwrap();
601
602        // Insert test data
603        conn.execute(
604            "INSERT INTO logs (timestamp, message) VALUES (12345, 'Test message')",
605            [],
606        )
607        .unwrap();
608
609        // Convert to JSONB
610        let result = convert_table_to_jsonb(&conn, "logs").unwrap();
611
612        assert_eq!(result.len(), 1);
613
614        // Should use row number as ID (1-indexed)
615        let (id, json) = &result[0];
616        assert_eq!(id, "1");
617        assert_eq!(json["message"], "Test message");
618    }
619
620    #[test]
621    fn test_convert_table_handles_null_values() {
622        let conn = Connection::open_in_memory().unwrap();
623
624        conn.execute(
625            "CREATE TABLE users (
626                id INTEGER PRIMARY KEY,
627                name TEXT,
628                email TEXT
629            )",
630            [],
631        )
632        .unwrap();
633
634        // Insert row with NULL values
635        conn.execute(
636            "INSERT INTO users (id, name, email) VALUES (1, 'Alice', NULL)",
637            [],
638        )
639        .unwrap();
640
641        let result = convert_table_to_jsonb(&conn, "users").unwrap();
642
643        assert_eq!(result.len(), 1);
644        let (_, json) = &result[0];
645        assert_eq!(json["name"], "Alice");
646        assert_eq!(json["email"], JsonValue::Null);
647    }
648
649    #[test]
650    fn test_convert_table_with_blob() {
651        let conn = Connection::open_in_memory().unwrap();
652
653        conn.execute(
654            "CREATE TABLE files (
655                id INTEGER PRIMARY KEY,
656                name TEXT,
657                data BLOB
658            )",
659            [],
660        )
661        .unwrap();
662
663        // Insert row with BLOB (must be Vec<u8>, not Vec<i32>)
664        let blob_data: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
665        conn.execute(
666            "INSERT INTO files (id, name, data) VALUES (?1, ?2, ?3)",
667            rusqlite::params![1, "test.bin", &blob_data],
668        )
669        .unwrap();
670
671        let result = convert_table_to_jsonb(&conn, "files").unwrap();
672
673        assert_eq!(result.len(), 1);
674        let (_, json) = &result[0];
675        assert_eq!(json["name"], "test.bin");
676
677        // BLOB should be base64-encoded
678        assert!(json["data"].is_object());
679        assert_eq!(json["data"]["_type"], "blob");
680        assert!(json["data"]["data"].is_string());
681    }
682
683    #[test]
684    fn test_detect_id_column_case_insensitive() {
685        let conn = Connection::open_in_memory().unwrap();
686
687        // Create table with uppercase ID column
688        conn.execute("CREATE TABLE test (ID INTEGER PRIMARY KEY, value TEXT)", [])
689            .unwrap();
690
691        let id_col = detect_id_column(&conn, "test").unwrap();
692        assert!(id_col.is_some());
693        assert_eq!(id_col.unwrap().to_lowercase(), "id");
694    }
695
696    #[test]
697    fn test_detect_id_column_rejects_duplicates() {
698        let conn = Connection::open_in_memory().unwrap();
699
700        conn.execute("CREATE TABLE dup_ids (id TEXT, value TEXT)", [])
701            .unwrap();
702        conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'one')", [])
703            .unwrap();
704        conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'two')", [])
705            .unwrap();
706
707        let id_col = detect_id_column(&conn, "dup_ids").unwrap();
708        assert!(id_col.is_none(), "Duplicate ID column should be rejected");
709    }
710
711    #[test]
712    fn test_detect_id_column_accepts_unique_text() {
713        let conn = Connection::open_in_memory().unwrap();
714
715        conn.execute("CREATE TABLE unique_ids (id TEXT, value TEXT)", [])
716            .unwrap();
717        conn.execute(
718            "INSERT INTO unique_ids (id, value) VALUES ('A', 'one'), ('B', 'two')",
719            [],
720        )
721        .unwrap();
722
723        let id_col = detect_id_column(&conn, "unique_ids").unwrap();
724        assert_eq!(id_col.as_deref(), Some("id"));
725    }
726
727    #[test]
728    fn test_convert_empty_table() {
729        let conn = Connection::open_in_memory().unwrap();
730
731        conn.execute("CREATE TABLE empty (id INTEGER PRIMARY KEY)", [])
732            .unwrap();
733
734        let result = convert_table_to_jsonb(&conn, "empty").unwrap();
735        assert_eq!(result.len(), 0);
736    }
737}