1use anyhow::{Context, Result};
5use rusqlite::Connection;
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8
9pub fn sqlite_value_to_json(value: &rusqlite::types::Value) -> Result<JsonValue> {
36 match value {
37 rusqlite::types::Value::Null => Ok(JsonValue::Null),
38
39 rusqlite::types::Value::Integer(i) => Ok(JsonValue::Number((*i).into())),
40
41 rusqlite::types::Value::Real(f) => {
42 if f.is_finite() {
45 serde_json::Number::from_f64(*f)
46 .map(JsonValue::Number)
47 .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48 } else {
49 Ok(JsonValue::String(f.to_string()))
51 }
52 }
53
54 rusqlite::types::Value::Text(s) => Ok(JsonValue::String(s.clone())),
55
56 rusqlite::types::Value::Blob(b) => {
57 let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
61 Ok(serde_json::json!({
62 "_type": "blob",
63 "data": encoded
64 }))
65 }
66 }
67}
68
69pub fn sqlite_row_to_json(row: HashMap<String, rusqlite::types::Value>) -> Result<JsonValue> {
96 let mut json_obj = serde_json::Map::new();
97
98 for (col_name, value) in row {
99 let json_value = sqlite_value_to_json(&value)
100 .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
101 json_obj.insert(col_name, json_value);
102 }
103
104 Ok(JsonValue::Object(json_obj))
105}
106
107pub fn convert_table_to_jsonb(conn: &Connection, table: &str) -> Result<Vec<(String, JsonValue)>> {
146 crate::jsonb::validate_table_name(table).context("Invalid table name for JSONB conversion")?;
148
149 tracing::info!("Converting SQLite table '{}' to JSONB", table);
150
151 let rows = crate::sqlite::reader::read_table_data(conn, table)
153 .with_context(|| format!("Failed to read data from table '{}'", table))?;
154
155 let id_column = detect_id_column(conn, table)?;
157
158 let mut result = Vec::with_capacity(rows.len());
159
160 for (row_num, row) in rows.into_iter().enumerate() {
161 let id = if let Some(ref id_col) = id_column {
163 match row.get(id_col) {
165 Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
166 Some(rusqlite::types::Value::Text(s)) => s.clone(),
167 Some(rusqlite::types::Value::Real(f)) => f.to_string(),
168 _ => {
169 tracing::warn!(
171 "Row {} in table '{}' has invalid ID type, using row number",
172 row_num + 1,
173 table
174 );
175 (row_num + 1).to_string()
176 }
177 }
178 } else {
179 (row_num + 1).to_string()
182 };
183
184 let json_data = sqlite_row_to_json(row).with_context(|| {
186 format!(
187 "Failed to convert row {} in table '{}' to JSON",
188 row_num + 1,
189 table
190 )
191 })?;
192
193 result.push((id, json_data));
194 }
195
196 tracing::info!(
197 "Converted {} rows from table '{}' to JSONB",
198 result.len(),
199 table
200 );
201
202 Ok(result)
203}
204
205fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
210 crate::jsonb::validate_table_name(table).context("Invalid SQLite table name")?;
211
212 let query = format!("PRAGMA table_info({})", crate::utils::quote_ident(table));
214 let mut stmt = conn
215 .prepare(&query)
216 .with_context(|| format!("Failed to get table info for '{}'", table))?;
217
218 let mut columns: Vec<String> = Vec::new();
219 let mut pk_columns: Vec<(i64, String)> = Vec::new();
220
221 let rows = stmt
222 .query_map([], |row| {
223 let name: String = row.get(1)?;
224 let pk_position: i64 = row.get(5)?;
225 Ok((name, pk_position))
226 })
227 .context("Failed to query table columns")?;
228
229 for row in rows {
230 let (name, pk_position) = row.context("Failed to parse table_info row")?;
231 if pk_position > 0 {
232 pk_columns.push((pk_position, name.clone()));
233 }
234 columns.push(name);
235 }
236
237 pk_columns.sort_by_key(|(pos, _)| *pos);
238 if pk_columns.len() == 1 {
239 let pk = pk_columns.remove(0).1;
240 tracing::debug!(
241 "Using primary key column '{}' as ID for table '{}'",
242 pk,
243 table
244 );
245 return Ok(Some(pk));
246 } else if pk_columns.len() > 1 {
247 tracing::info!(
248 "Table '{}' has a composite primary key; falling back to row numbers",
249 table
250 );
251 return Ok(None);
252 }
253
254 let id_candidates = ["id", "rowid", "_id"];
256 for candidate in &id_candidates {
257 if let Some(col) = columns.iter().find(|c| c.to_lowercase() == *candidate) {
258 if column_is_unique(conn, table, col)? {
259 tracing::debug!("Using unique column '{}' as ID for table '{}'", col, table);
260 return Ok(Some(col.clone()));
261 } else {
262 tracing::warn!(
263 "Column '{}' on table '{}' contains duplicate values; using row numbers instead",
264 col,
265 table
266 );
267 }
268 }
269 }
270
271 tracing::debug!(
272 "No unique ID column found for table '{}', will use row number",
273 table
274 );
275 Ok(None)
276}
277
278fn column_is_unique(conn: &Connection, table: &str, column: &str) -> Result<bool> {
279 crate::jsonb::validate_table_name(column).context("Invalid column name")?;
280
281 let query = format!(
282 "SELECT COUNT(*) as total_rows, COUNT(DISTINCT {}) as distinct_rows FROM {}",
283 crate::utils::quote_ident(column),
284 crate::utils::quote_ident(table)
285 );
286
287 let (total_rows, distinct_rows): (i64, i64) = conn
288 .query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?)))
289 .with_context(|| {
290 format!(
291 "Failed to evaluate uniqueness of column '{}' on table '{}'",
292 column, table
293 )
294 })?;
295
296 Ok(total_rows == distinct_rows)
297}
298
299fn convert_batch_to_jsonb(
303 rows: Vec<HashMap<String, rusqlite::types::Value>>,
304 id_column: &Option<String>,
305 start_row_num: usize,
306 table: &str,
307) -> Result<Vec<(String, JsonValue)>> {
308 let mut result = Vec::with_capacity(rows.len());
309
310 for (batch_idx, mut row) in rows.into_iter().enumerate() {
311 let row_num = start_row_num + batch_idx;
312
313 row.remove("_rowid");
315
316 let id = if let Some(ref id_col) = id_column {
318 match row.get(id_col) {
319 Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
320 Some(rusqlite::types::Value::Text(s)) => s.clone(),
321 Some(rusqlite::types::Value::Real(f)) => f.to_string(),
322 _ => (row_num + 1).to_string(),
323 }
324 } else {
325 (row_num + 1).to_string()
326 };
327
328 let json_data = sqlite_row_to_json(row).with_context(|| {
330 format!(
331 "Failed to convert row {} in table '{}' to JSON",
332 row_num + 1,
333 table
334 )
335 })?;
336
337 result.push((id, json_data));
338 }
339
340 Ok(result)
341}
342
343pub async fn convert_table_batched(
384 sqlite_conn: &Connection,
385 pg_client: &tokio_postgres::Client,
386 table: &str,
387 source_type: &str,
388 batch_size: Option<usize>,
389) -> Result<usize> {
390 use crate::sqlite::reader::{read_table_batch, BatchedTableReader};
391
392 let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);
394
395 tracing::info!(
396 "Starting batched conversion of table '{}' (batch_size={})",
397 table,
398 batch_size
399 );
400
401 let id_column = detect_id_column(sqlite_conn, table)?;
403
404 let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;
406
407 let mut total_rows = 0usize;
408 let mut batch_num = 0usize;
409
410 while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
412 let batch_row_count = rows.len();
413 batch_num += 1;
414
415 tracing::debug!(
416 "Processing batch {} ({} rows) from table '{}'",
417 batch_num,
418 batch_row_count,
419 table
420 );
421
422 let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;
424
425 if !jsonb_rows.is_empty() {
427 crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
428 .await
429 .with_context(|| {
430 format!(
431 "Failed to insert batch {} into PostgreSQL table '{}'",
432 batch_num, table
433 )
434 })?;
435 }
436
437 total_rows += batch_row_count;
438
439 if total_rows.is_multiple_of(100_000) {
441 tracing::info!(
442 "Progress: {} rows processed from table '{}'",
443 total_rows,
444 table
445 );
446 }
447 }
448
449 tracing::info!(
450 "Completed batched conversion of table '{}': {} total rows in {} batches",
451 table,
452 total_rows,
453 batch_num
454 );
455
456 Ok(total_rows)
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use rusqlite::types::Value;
463
464 #[test]
465 fn test_convert_integer() {
466 let value = Value::Integer(42);
467 let json = sqlite_value_to_json(&value).unwrap();
468 assert_eq!(json, serde_json::json!(42));
469 }
470
471 #[test]
472 fn test_convert_real() {
473 let value = Value::Real(42.75);
474 let json = sqlite_value_to_json(&value).unwrap();
475 assert_eq!(json, serde_json::json!(42.75));
476 }
477
478 #[test]
479 fn test_convert_text() {
480 let value = Value::Text("Hello, World!".to_string());
481 let json = sqlite_value_to_json(&value).unwrap();
482 assert_eq!(json, serde_json::json!("Hello, World!"));
483 }
484
485 #[test]
486 fn test_convert_null() {
487 let value = Value::Null;
488 let json = sqlite_value_to_json(&value).unwrap();
489 assert_eq!(json, JsonValue::Null);
490 }
491
492 #[test]
493 fn test_convert_blob() {
494 let blob_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; let value = Value::Blob(blob_data.clone());
496 let json = sqlite_value_to_json(&value).unwrap();
497
498 assert!(json.is_object());
500 assert_eq!(json["_type"], "blob");
501
502 let encoded = json["data"].as_str().unwrap();
504 let decoded =
505 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encoded).unwrap();
506 assert_eq!(decoded, blob_data);
507 }
508
509 #[test]
510 fn test_convert_non_finite_float() {
511 let nan_value = Value::Real(f64::NAN);
512 let json = sqlite_value_to_json(&nan_value).unwrap();
513 assert!(json.is_string());
515
516 let inf_value = Value::Real(f64::INFINITY);
517 let json = sqlite_value_to_json(&inf_value).unwrap();
518 assert!(json.is_string());
520 }
521
522 #[test]
523 fn test_sqlite_row_to_json() {
524 let mut row = HashMap::new();
525 row.insert("id".to_string(), Value::Integer(1));
526 row.insert("name".to_string(), Value::Text("Alice".to_string()));
527 row.insert("age".to_string(), Value::Integer(30));
528 row.insert("balance".to_string(), Value::Real(100.50));
529 row.insert("notes".to_string(), Value::Null);
530
531 let json = sqlite_row_to_json(row).unwrap();
532
533 assert_eq!(json["id"], 1);
534 assert_eq!(json["name"], "Alice");
535 assert_eq!(json["age"], 30);
536 assert_eq!(json["balance"], 100.50);
537 assert_eq!(json["notes"], JsonValue::Null);
538 }
539
540 #[test]
541 fn test_convert_table_to_jsonb() {
542 let conn = Connection::open_in_memory().unwrap();
544
545 conn.execute(
547 "CREATE TABLE users (
548 id INTEGER PRIMARY KEY,
549 name TEXT NOT NULL,
550 email TEXT,
551 age INTEGER
552 )",
553 [],
554 )
555 .unwrap();
556
557 conn.execute(
559 "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
560 [],
561 )
562 .unwrap();
563 conn.execute(
564 "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
565 [],
566 )
567 .unwrap();
568
569 let result = convert_table_to_jsonb(&conn, "users").unwrap();
571
572 assert_eq!(result.len(), 2);
573
574 let (id1, json1) = &result[0];
576 assert_eq!(id1, "1");
577 assert_eq!(json1["name"], "Alice");
578 assert_eq!(json1["email"], "alice@example.com");
579 assert_eq!(json1["age"], 30);
580
581 let (id2, json2) = &result[1];
583 assert_eq!(id2, "2");
584 assert_eq!(json2["name"], "Bob");
585 }
586
587 #[test]
588 fn test_convert_table_without_id_column() {
589 let conn = Connection::open_in_memory().unwrap();
591
592 conn.execute(
594 "CREATE TABLE logs (
595 timestamp INTEGER,
596 message TEXT
597 )",
598 [],
599 )
600 .unwrap();
601
602 conn.execute(
604 "INSERT INTO logs (timestamp, message) VALUES (12345, 'Test message')",
605 [],
606 )
607 .unwrap();
608
609 let result = convert_table_to_jsonb(&conn, "logs").unwrap();
611
612 assert_eq!(result.len(), 1);
613
614 let (id, json) = &result[0];
616 assert_eq!(id, "1");
617 assert_eq!(json["message"], "Test message");
618 }
619
620 #[test]
621 fn test_convert_table_handles_null_values() {
622 let conn = Connection::open_in_memory().unwrap();
623
624 conn.execute(
625 "CREATE TABLE users (
626 id INTEGER PRIMARY KEY,
627 name TEXT,
628 email TEXT
629 )",
630 [],
631 )
632 .unwrap();
633
634 conn.execute(
636 "INSERT INTO users (id, name, email) VALUES (1, 'Alice', NULL)",
637 [],
638 )
639 .unwrap();
640
641 let result = convert_table_to_jsonb(&conn, "users").unwrap();
642
643 assert_eq!(result.len(), 1);
644 let (_, json) = &result[0];
645 assert_eq!(json["name"], "Alice");
646 assert_eq!(json["email"], JsonValue::Null);
647 }
648
649 #[test]
650 fn test_convert_table_with_blob() {
651 let conn = Connection::open_in_memory().unwrap();
652
653 conn.execute(
654 "CREATE TABLE files (
655 id INTEGER PRIMARY KEY,
656 name TEXT,
657 data BLOB
658 )",
659 [],
660 )
661 .unwrap();
662
663 let blob_data: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
665 conn.execute(
666 "INSERT INTO files (id, name, data) VALUES (?1, ?2, ?3)",
667 rusqlite::params![1, "test.bin", &blob_data],
668 )
669 .unwrap();
670
671 let result = convert_table_to_jsonb(&conn, "files").unwrap();
672
673 assert_eq!(result.len(), 1);
674 let (_, json) = &result[0];
675 assert_eq!(json["name"], "test.bin");
676
677 assert!(json["data"].is_object());
679 assert_eq!(json["data"]["_type"], "blob");
680 assert!(json["data"]["data"].is_string());
681 }
682
683 #[test]
684 fn test_detect_id_column_case_insensitive() {
685 let conn = Connection::open_in_memory().unwrap();
686
687 conn.execute("CREATE TABLE test (ID INTEGER PRIMARY KEY, value TEXT)", [])
689 .unwrap();
690
691 let id_col = detect_id_column(&conn, "test").unwrap();
692 assert!(id_col.is_some());
693 assert_eq!(id_col.unwrap().to_lowercase(), "id");
694 }
695
696 #[test]
697 fn test_detect_id_column_rejects_duplicates() {
698 let conn = Connection::open_in_memory().unwrap();
699
700 conn.execute("CREATE TABLE dup_ids (id TEXT, value TEXT)", [])
701 .unwrap();
702 conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'one')", [])
703 .unwrap();
704 conn.execute("INSERT INTO dup_ids (id, value) VALUES ('A', 'two')", [])
705 .unwrap();
706
707 let id_col = detect_id_column(&conn, "dup_ids").unwrap();
708 assert!(id_col.is_none(), "Duplicate ID column should be rejected");
709 }
710
711 #[test]
712 fn test_detect_id_column_accepts_unique_text() {
713 let conn = Connection::open_in_memory().unwrap();
714
715 conn.execute("CREATE TABLE unique_ids (id TEXT, value TEXT)", [])
716 .unwrap();
717 conn.execute(
718 "INSERT INTO unique_ids (id, value) VALUES ('A', 'one'), ('B', 'two')",
719 [],
720 )
721 .unwrap();
722
723 let id_col = detect_id_column(&conn, "unique_ids").unwrap();
724 assert_eq!(id_col.as_deref(), Some("id"));
725 }
726
727 #[test]
728 fn test_convert_empty_table() {
729 let conn = Connection::open_in_memory().unwrap();
730
731 conn.execute("CREATE TABLE empty (id INTEGER PRIMARY KEY)", [])
732 .unwrap();
733
734 let result = convert_table_to_jsonb(&conn, "empty").unwrap();
735 assert_eq!(result.len(), 0);
736 }
737}