database_replicator/sqlite/
converter.rs

1// ABOUTME: SQLite to JSONB type conversion for PostgreSQL storage
2// ABOUTME: Handles all SQLite types with lossless conversion and BLOB base64 encoding
3
4use anyhow::{Context, Result};
5use rusqlite::Connection;
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8
9/// Convert a single SQLite value to JSON
10///
11/// Maps SQLite types to JSON types:
12/// - INTEGER → number (i64)
13/// - REAL → number (f64)
14/// - TEXT → string (UTF-8)
15/// - BLOB → object with base64-encoded data
16/// - NULL → null
17///
18/// # Arguments
19///
20/// * `value` - SQLite value from rusqlite
21///
22/// # Returns
23///
24/// JSON value suitable for JSONB storage
25///
26/// # Examples
27///
28/// ```no_run
29/// # use database_replicator::sqlite::converter::sqlite_value_to_json;
30/// # use rusqlite::types::Value;
31/// let sqlite_int = Value::Integer(42);
32/// let json = sqlite_value_to_json(&sqlite_int).unwrap();
33/// assert_eq!(json, serde_json::json!(42));
34/// ```
35pub fn sqlite_value_to_json(value: &rusqlite::types::Value) -> Result<JsonValue> {
36    match value {
37        rusqlite::types::Value::Null => Ok(JsonValue::Null),
38
39        rusqlite::types::Value::Integer(i) => Ok(JsonValue::Number((*i).into())),
40
41        rusqlite::types::Value::Real(f) => {
42            // Convert f64 to JSON number
43            // Note: JSON can't represent NaN or Infinity, handle edge cases
44            if f.is_finite() {
45                serde_json::Number::from_f64(*f)
46                    .map(JsonValue::Number)
47                    .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48            } else {
49                // Store non-finite numbers as strings for safety
50                Ok(JsonValue::String(f.to_string()))
51            }
52        }
53
54        rusqlite::types::Value::Text(s) => Ok(JsonValue::String(s.clone())),
55
56        rusqlite::types::Value::Blob(b) => {
57            // Encode BLOB as base64 in a JSON object
58            // Format: {"_type": "blob", "data": "base64..."}
59            // This allows distinguishing BLOBs from regular strings
60            let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
61            Ok(serde_json::json!({
62                "_type": "blob",
63                "data": encoded
64            }))
65        }
66    }
67}
68
69/// Convert a SQLite row (HashMap) to JSON object
70///
71/// Converts all column values to JSON and returns a JSON object
72/// with column names as keys.
73///
74/// # Arguments
75///
76/// * `row` - HashMap of column_name → SQLite value
77///
78/// # Returns
79///
80/// JSON object ready for JSONB storage
81///
82/// # Examples
83///
84/// ```no_run
85/// # use database_replicator::sqlite::converter::sqlite_row_to_json;
86/// # use std::collections::HashMap;
87/// # use rusqlite::types::Value;
88/// let mut row = HashMap::new();
89/// row.insert("id".to_string(), Value::Integer(1));
90/// row.insert("name".to_string(), Value::Text("Alice".to_string()));
91/// let json = sqlite_row_to_json(row).unwrap();
92/// assert_eq!(json["id"], 1);
93/// assert_eq!(json["name"], "Alice");
94/// ```
95pub fn sqlite_row_to_json(row: HashMap<String, rusqlite::types::Value>) -> Result<JsonValue> {
96    let mut json_obj = serde_json::Map::new();
97
98    for (col_name, value) in row {
99        let json_value = sqlite_value_to_json(&value)
100            .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
101        json_obj.insert(col_name, json_value);
102    }
103
104    Ok(JsonValue::Object(json_obj))
105}
106
107/// Convert an entire SQLite table to JSONB format
108///
109/// Reads all rows from a SQLite table and converts them to JSONB.
110/// Returns a vector of (id, json_data) tuples ready for insertion.
111///
112/// # ID Generation Strategy
113///
114/// - If table has a column named "id", "rowid", or "_id", use that as the ID
115/// - Otherwise, use SQLite's rowid (every table has one)
116/// - IDs are converted to strings for consistency
117///
118/// # Arguments
119///
120/// * `conn` - SQLite database connection
121/// * `table` - Table name (must be validated)
122///
123/// # Returns
124///
125/// Vector of (id_string, json_data) tuples for batch insert
126///
127/// # Security
128///
129/// Table name should be validated before calling this function.
130///
131/// # Examples
132///
133/// ```no_run
134/// # use database_replicator::sqlite::{open_sqlite, converter::convert_table_to_jsonb};
135/// # use database_replicator::jsonb::validate_table_name;
136/// # fn example() -> anyhow::Result<()> {
137/// let conn = open_sqlite("database.db")?;
138/// let table = "users";
139/// validate_table_name(table)?;
140/// let rows = convert_table_to_jsonb(&conn, table)?;
141/// println!("Converted {} rows to JSONB", rows.len());
142/// # Ok(())
143/// # }
144/// ```
145pub fn convert_table_to_jsonb(conn: &Connection, table: &str) -> Result<Vec<(String, JsonValue)>> {
146    // Validate table name
147    crate::jsonb::validate_table_name(table).context("Invalid table name for JSONB conversion")?;
148
149    tracing::info!("Converting SQLite table '{}' to JSONB", table);
150
151    // Read all rows using our reader
152    let rows = crate::sqlite::reader::read_table_data(conn, table)
153        .with_context(|| format!("Failed to read data from table '{}'", table))?;
154
155    // Detect ID column
156    let id_column = detect_id_column(conn, table)?;
157
158    let mut result = Vec::with_capacity(rows.len());
159
160    for (row_num, row) in rows.into_iter().enumerate() {
161        // Extract or generate ID
162        let id = if let Some(ref id_col) = id_column {
163            // Use the specified ID column
164            match row.get(id_col) {
165                Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
166                Some(rusqlite::types::Value::Text(s)) => s.clone(),
167                Some(rusqlite::types::Value::Real(f)) => f.to_string(),
168                _ => {
169                    // Fallback to row number if ID is NULL or unsupported type
170                    tracing::warn!(
171                        "Row {} in table '{}' has invalid ID type, using row number",
172                        row_num + 1,
173                        table
174                    );
175                    (row_num + 1).to_string()
176                }
177            }
178        } else {
179            // No ID column found, use row number
180            // SQLite rowid is 1-indexed, so we add 1
181            (row_num + 1).to_string()
182        };
183
184        // Convert row to JSON
185        let json_data = sqlite_row_to_json(row).with_context(|| {
186            format!(
187                "Failed to convert row {} in table '{}' to JSON",
188                row_num + 1,
189                table
190            )
191        })?;
192
193        result.push((id, json_data));
194    }
195
196    tracing::info!(
197        "Converted {} rows from table '{}' to JSONB",
198        result.len(),
199        table
200    );
201
202    Ok(result)
203}
204
205/// Detect the ID column for a table
206///
207/// Checks for common ID column names: "id", "rowid", "_id" (case-insensitive).
208/// If found, returns the column name. Otherwise returns None.
209fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
210    // Get column names for the table
211    let query = format!("PRAGMA table_info(\"{}\")", table);
212    let mut stmt = conn
213        .prepare(&query)
214        .with_context(|| format!("Failed to get table info for '{}'", table))?;
215
216    let columns: Vec<String> = stmt
217        .query_map([], |row| row.get::<_, String>(1))
218        .context("Failed to query table columns")?
219        .collect::<Result<Vec<_>, _>>()
220        .context("Failed to collect column names")?;
221
222    // Check for common ID column names (case-insensitive)
223    let id_candidates = ["id", "rowid", "_id"];
224    for candidate in &id_candidates {
225        if let Some(col) = columns.iter().find(|c| c.to_lowercase() == *candidate) {
226            tracing::debug!("Using column '{}' as ID for table '{}'", col, table);
227            return Ok(Some(col.clone()));
228        }
229    }
230
231    tracing::debug!(
232        "No ID column found for table '{}', will use row number",
233        table
234    );
235    Ok(None)
236}
237
238/// Convert a batch of SQLite rows to JSONB format.
239///
240/// Converts a pre-read batch of rows, extracting IDs and converting to JSON.
241fn convert_batch_to_jsonb(
242    rows: Vec<HashMap<String, rusqlite::types::Value>>,
243    id_column: &Option<String>,
244    start_row_num: usize,
245    table: &str,
246) -> Result<Vec<(String, JsonValue)>> {
247    let mut result = Vec::with_capacity(rows.len());
248
249    for (batch_idx, mut row) in rows.into_iter().enumerate() {
250        let row_num = start_row_num + batch_idx;
251
252        // Remove internal _rowid tracking column before conversion
253        row.remove("_rowid");
254
255        // Extract or generate ID
256        let id = if let Some(ref id_col) = id_column {
257            match row.get(id_col) {
258                Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
259                Some(rusqlite::types::Value::Text(s)) => s.clone(),
260                Some(rusqlite::types::Value::Real(f)) => f.to_string(),
261                _ => (row_num + 1).to_string(),
262            }
263        } else {
264            (row_num + 1).to_string()
265        };
266
267        // Convert row to JSON
268        let json_data = sqlite_row_to_json(row).with_context(|| {
269            format!(
270                "Failed to convert row {} in table '{}' to JSON",
271                row_num + 1,
272                table
273            )
274        })?;
275
276        result.push((id, json_data));
277    }
278
279    Ok(result)
280}
281
282/// Convert and insert a SQLite table to PostgreSQL using batched processing.
283///
284/// This function uses memory-efficient batched processing to handle large tables:
285/// 1. Reads rows in batches (default 10,000 rows)
286/// 2. Converts each batch to JSONB format
287/// 3. Inserts each batch to PostgreSQL before reading the next
288///
289/// Memory usage stays constant regardless of table size.
290///
291/// # Arguments
292///
293/// * `sqlite_conn` - SQLite database connection
294/// * `pg_client` - PostgreSQL client connection
295/// * `table` - Table name to convert
296/// * `source_type` - Source type label for metadata (e.g., "sqlite")
297/// * `batch_size` - Optional batch size (default: 10,000 rows)
298///
299/// # Returns
300///
301/// Total number of rows processed.
302///
303/// # Examples
304///
305/// ```no_run
306/// # use database_replicator::sqlite::converter::convert_table_batched;
307/// # async fn example(
308/// #     sqlite_conn: &rusqlite::Connection,
309/// #     pg_client: &tokio_postgres::Client,
310/// # ) -> anyhow::Result<()> {
311/// let rows_processed = convert_table_batched(
312///     sqlite_conn,
313///     pg_client,
314///     "large_table",
315///     "sqlite",
316///     None,
317/// ).await?;
318/// println!("Processed {} rows", rows_processed);
319/// # Ok(())
320/// # }
321/// ```
322pub async fn convert_table_batched(
323    sqlite_conn: &Connection,
324    pg_client: &tokio_postgres::Client,
325    table: &str,
326    source_type: &str,
327    batch_size: Option<usize>,
328) -> Result<usize> {
329    use crate::sqlite::reader::{read_table_batch, BatchedTableReader};
330
331    // Use memory-based batch size calculation if not specified
332    let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);
333
334    tracing::info!(
335        "Starting batched conversion of table '{}' (batch_size={})",
336        table,
337        batch_size
338    );
339
340    // Detect ID column once before processing batches
341    let id_column = detect_id_column(sqlite_conn, table)?;
342
343    // Create batched reader
344    let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;
345
346    let mut total_rows = 0usize;
347    let mut batch_num = 0usize;
348
349    // Process batches until exhausted
350    while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
351        let batch_row_count = rows.len();
352        batch_num += 1;
353
354        tracing::debug!(
355            "Processing batch {} ({} rows) from table '{}'",
356            batch_num,
357            batch_row_count,
358            table
359        );
360
361        // Convert batch to JSONB
362        let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;
363
364        // Insert batch to PostgreSQL
365        if !jsonb_rows.is_empty() {
366            crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
367                .await
368                .with_context(|| {
369                    format!(
370                        "Failed to insert batch {} into PostgreSQL table '{}'",
371                        batch_num, table
372                    )
373                })?;
374        }
375
376        total_rows += batch_row_count;
377
378        // Log progress for large tables
379        if total_rows.is_multiple_of(100_000) {
380            tracing::info!(
381                "Progress: {} rows processed from table '{}'",
382                total_rows,
383                table
384            );
385        }
386    }
387
388    tracing::info!(
389        "Completed batched conversion of table '{}': {} total rows in {} batches",
390        table,
391        total_rows,
392        batch_num
393    );
394
395    Ok(total_rows)
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use rusqlite::types::Value;
402
403    #[test]
404    fn test_convert_integer() {
405        let value = Value::Integer(42);
406        let json = sqlite_value_to_json(&value).unwrap();
407        assert_eq!(json, serde_json::json!(42));
408    }
409
410    #[test]
411    fn test_convert_real() {
412        let value = Value::Real(42.75);
413        let json = sqlite_value_to_json(&value).unwrap();
414        assert_eq!(json, serde_json::json!(42.75));
415    }
416
417    #[test]
418    fn test_convert_text() {
419        let value = Value::Text("Hello, World!".to_string());
420        let json = sqlite_value_to_json(&value).unwrap();
421        assert_eq!(json, serde_json::json!("Hello, World!"));
422    }
423
424    #[test]
425    fn test_convert_null() {
426        let value = Value::Null;
427        let json = sqlite_value_to_json(&value).unwrap();
428        assert_eq!(json, JsonValue::Null);
429    }
430
431    #[test]
432    fn test_convert_blob() {
433        let blob_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello" in bytes
434        let value = Value::Blob(blob_data.clone());
435        let json = sqlite_value_to_json(&value).unwrap();
436
437        // Should be wrapped in an object with _type and data fields
438        assert!(json.is_object());
439        assert_eq!(json["_type"], "blob");
440
441        // Decode and verify
442        let encoded = json["data"].as_str().unwrap();
443        let decoded =
444            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encoded).unwrap();
445        assert_eq!(decoded, blob_data);
446    }
447
448    #[test]
449    fn test_convert_non_finite_float() {
450        let nan_value = Value::Real(f64::NAN);
451        let json = sqlite_value_to_json(&nan_value).unwrap();
452        // NaN should be converted to string
453        assert!(json.is_string());
454
455        let inf_value = Value::Real(f64::INFINITY);
456        let json = sqlite_value_to_json(&inf_value).unwrap();
457        // Infinity should be converted to string
458        assert!(json.is_string());
459    }
460
461    #[test]
462    fn test_sqlite_row_to_json() {
463        let mut row = HashMap::new();
464        row.insert("id".to_string(), Value::Integer(1));
465        row.insert("name".to_string(), Value::Text("Alice".to_string()));
466        row.insert("age".to_string(), Value::Integer(30));
467        row.insert("balance".to_string(), Value::Real(100.50));
468        row.insert("notes".to_string(), Value::Null);
469
470        let json = sqlite_row_to_json(row).unwrap();
471
472        assert_eq!(json["id"], 1);
473        assert_eq!(json["name"], "Alice");
474        assert_eq!(json["age"], 30);
475        assert_eq!(json["balance"], 100.50);
476        assert_eq!(json["notes"], JsonValue::Null);
477    }
478
479    #[test]
480    fn test_convert_table_to_jsonb() {
481        // Create a test database
482        let conn = Connection::open_in_memory().unwrap();
483
484        // Create test table with ID column
485        conn.execute(
486            "CREATE TABLE users (
487                id INTEGER PRIMARY KEY,
488                name TEXT NOT NULL,
489                email TEXT,
490                age INTEGER
491            )",
492            [],
493        )
494        .unwrap();
495
496        // Insert test data
497        conn.execute(
498            "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
499            [],
500        )
501        .unwrap();
502        conn.execute(
503            "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
504            [],
505        )
506        .unwrap();
507
508        // Convert to JSONB
509        let result = convert_table_to_jsonb(&conn, "users").unwrap();
510
511        assert_eq!(result.len(), 2);
512
513        // Check first row
514        let (id1, json1) = &result[0];
515        assert_eq!(id1, "1");
516        assert_eq!(json1["name"], "Alice");
517        assert_eq!(json1["email"], "alice@example.com");
518        assert_eq!(json1["age"], 30);
519
520        // Check second row
521        let (id2, json2) = &result[1];
522        assert_eq!(id2, "2");
523        assert_eq!(json2["name"], "Bob");
524    }
525
526    #[test]
527    fn test_convert_table_without_id_column() {
528        // Create a test database
529        let conn = Connection::open_in_memory().unwrap();
530
531        // Create table WITHOUT explicit ID column
532        conn.execute(
533            "CREATE TABLE logs (
534                timestamp INTEGER,
535                message TEXT
536            )",
537            [],
538        )
539        .unwrap();
540
541        // Insert test data
542        conn.execute(
543            "INSERT INTO logs (timestamp, message) VALUES (12345, 'Test message')",
544            [],
545        )
546        .unwrap();
547
548        // Convert to JSONB
549        let result = convert_table_to_jsonb(&conn, "logs").unwrap();
550
551        assert_eq!(result.len(), 1);
552
553        // Should use row number as ID (1-indexed)
554        let (id, json) = &result[0];
555        assert_eq!(id, "1");
556        assert_eq!(json["message"], "Test message");
557    }
558
559    #[test]
560    fn test_convert_table_handles_null_values() {
561        let conn = Connection::open_in_memory().unwrap();
562
563        conn.execute(
564            "CREATE TABLE users (
565                id INTEGER PRIMARY KEY,
566                name TEXT,
567                email TEXT
568            )",
569            [],
570        )
571        .unwrap();
572
573        // Insert row with NULL values
574        conn.execute(
575            "INSERT INTO users (id, name, email) VALUES (1, 'Alice', NULL)",
576            [],
577        )
578        .unwrap();
579
580        let result = convert_table_to_jsonb(&conn, "users").unwrap();
581
582        assert_eq!(result.len(), 1);
583        let (_, json) = &result[0];
584        assert_eq!(json["name"], "Alice");
585        assert_eq!(json["email"], JsonValue::Null);
586    }
587
588    #[test]
589    fn test_convert_table_with_blob() {
590        let conn = Connection::open_in_memory().unwrap();
591
592        conn.execute(
593            "CREATE TABLE files (
594                id INTEGER PRIMARY KEY,
595                name TEXT,
596                data BLOB
597            )",
598            [],
599        )
600        .unwrap();
601
602        // Insert row with BLOB (must be Vec<u8>, not Vec<i32>)
603        let blob_data: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
604        conn.execute(
605            "INSERT INTO files (id, name, data) VALUES (?1, ?2, ?3)",
606            rusqlite::params![1, "test.bin", &blob_data],
607        )
608        .unwrap();
609
610        let result = convert_table_to_jsonb(&conn, "files").unwrap();
611
612        assert_eq!(result.len(), 1);
613        let (_, json) = &result[0];
614        assert_eq!(json["name"], "test.bin");
615
616        // BLOB should be base64-encoded
617        assert!(json["data"].is_object());
618        assert_eq!(json["data"]["_type"], "blob");
619        assert!(json["data"]["data"].is_string());
620    }
621
622    #[test]
623    fn test_detect_id_column_case_insensitive() {
624        let conn = Connection::open_in_memory().unwrap();
625
626        // Create table with uppercase ID column
627        conn.execute("CREATE TABLE test (ID INTEGER PRIMARY KEY, value TEXT)", [])
628            .unwrap();
629
630        let id_col = detect_id_column(&conn, "test").unwrap();
631        assert!(id_col.is_some());
632        assert_eq!(id_col.unwrap().to_lowercase(), "id");
633    }
634
635    #[test]
636    fn test_convert_empty_table() {
637        let conn = Connection::open_in_memory().unwrap();
638
639        conn.execute("CREATE TABLE empty (id INTEGER PRIMARY KEY)", [])
640            .unwrap();
641
642        let result = convert_table_to_jsonb(&conn, "empty").unwrap();
643        assert_eq!(result.len(), 0);
644    }
645}