1use anyhow::{Context, Result};
5use rusqlite::Connection;
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8
9pub fn sqlite_value_to_json(value: &rusqlite::types::Value) -> Result<JsonValue> {
36 match value {
37 rusqlite::types::Value::Null => Ok(JsonValue::Null),
38
39 rusqlite::types::Value::Integer(i) => Ok(JsonValue::Number((*i).into())),
40
41 rusqlite::types::Value::Real(f) => {
42 if f.is_finite() {
45 serde_json::Number::from_f64(*f)
46 .map(JsonValue::Number)
47 .ok_or_else(|| anyhow::anyhow!("Failed to convert float {} to JSON number", f))
48 } else {
49 Ok(JsonValue::String(f.to_string()))
51 }
52 }
53
54 rusqlite::types::Value::Text(s) => Ok(JsonValue::String(s.clone())),
55
56 rusqlite::types::Value::Blob(b) => {
57 let encoded = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, b);
61 Ok(serde_json::json!({
62 "_type": "blob",
63 "data": encoded
64 }))
65 }
66 }
67}
68
69pub fn sqlite_row_to_json(row: HashMap<String, rusqlite::types::Value>) -> Result<JsonValue> {
96 let mut json_obj = serde_json::Map::new();
97
98 for (col_name, value) in row {
99 let json_value = sqlite_value_to_json(&value)
100 .with_context(|| format!("Failed to convert column '{}' to JSON", col_name))?;
101 json_obj.insert(col_name, json_value);
102 }
103
104 Ok(JsonValue::Object(json_obj))
105}
106
107pub fn convert_table_to_jsonb(conn: &Connection, table: &str) -> Result<Vec<(String, JsonValue)>> {
146 crate::jsonb::validate_table_name(table).context("Invalid table name for JSONB conversion")?;
148
149 tracing::info!("Converting SQLite table '{}' to JSONB", table);
150
151 let rows = crate::sqlite::reader::read_table_data(conn, table)
153 .with_context(|| format!("Failed to read data from table '{}'", table))?;
154
155 let id_column = detect_id_column(conn, table)?;
157
158 let mut result = Vec::with_capacity(rows.len());
159
160 for (row_num, row) in rows.into_iter().enumerate() {
161 let id = if let Some(ref id_col) = id_column {
163 match row.get(id_col) {
165 Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
166 Some(rusqlite::types::Value::Text(s)) => s.clone(),
167 Some(rusqlite::types::Value::Real(f)) => f.to_string(),
168 _ => {
169 tracing::warn!(
171 "Row {} in table '{}' has invalid ID type, using row number",
172 row_num + 1,
173 table
174 );
175 (row_num + 1).to_string()
176 }
177 }
178 } else {
179 (row_num + 1).to_string()
182 };
183
184 let json_data = sqlite_row_to_json(row).with_context(|| {
186 format!(
187 "Failed to convert row {} in table '{}' to JSON",
188 row_num + 1,
189 table
190 )
191 })?;
192
193 result.push((id, json_data));
194 }
195
196 tracing::info!(
197 "Converted {} rows from table '{}' to JSONB",
198 result.len(),
199 table
200 );
201
202 Ok(result)
203}
204
205fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
210 let query = format!("PRAGMA table_info(\"{}\")", table);
212 let mut stmt = conn
213 .prepare(&query)
214 .with_context(|| format!("Failed to get table info for '{}'", table))?;
215
216 let columns: Vec<String> = stmt
217 .query_map([], |row| row.get::<_, String>(1))
218 .context("Failed to query table columns")?
219 .collect::<Result<Vec<_>, _>>()
220 .context("Failed to collect column names")?;
221
222 let id_candidates = ["id", "rowid", "_id"];
224 for candidate in &id_candidates {
225 if let Some(col) = columns.iter().find(|c| c.to_lowercase() == *candidate) {
226 tracing::debug!("Using column '{}' as ID for table '{}'", col, table);
227 return Ok(Some(col.clone()));
228 }
229 }
230
231 tracing::debug!(
232 "No ID column found for table '{}', will use row number",
233 table
234 );
235 Ok(None)
236}
237
238fn convert_batch_to_jsonb(
242 rows: Vec<HashMap<String, rusqlite::types::Value>>,
243 id_column: &Option<String>,
244 start_row_num: usize,
245 table: &str,
246) -> Result<Vec<(String, JsonValue)>> {
247 let mut result = Vec::with_capacity(rows.len());
248
249 for (batch_idx, mut row) in rows.into_iter().enumerate() {
250 let row_num = start_row_num + batch_idx;
251
252 row.remove("_rowid");
254
255 let id = if let Some(ref id_col) = id_column {
257 match row.get(id_col) {
258 Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
259 Some(rusqlite::types::Value::Text(s)) => s.clone(),
260 Some(rusqlite::types::Value::Real(f)) => f.to_string(),
261 _ => (row_num + 1).to_string(),
262 }
263 } else {
264 (row_num + 1).to_string()
265 };
266
267 let json_data = sqlite_row_to_json(row).with_context(|| {
269 format!(
270 "Failed to convert row {} in table '{}' to JSON",
271 row_num + 1,
272 table
273 )
274 })?;
275
276 result.push((id, json_data));
277 }
278
279 Ok(result)
280}
281
282pub async fn convert_table_batched(
323 sqlite_conn: &Connection,
324 pg_client: &tokio_postgres::Client,
325 table: &str,
326 source_type: &str,
327 batch_size: Option<usize>,
328) -> Result<usize> {
329 use crate::sqlite::reader::{read_table_batch, BatchedTableReader};
330
331 let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);
333
334 tracing::info!(
335 "Starting batched conversion of table '{}' (batch_size={})",
336 table,
337 batch_size
338 );
339
340 let id_column = detect_id_column(sqlite_conn, table)?;
342
343 let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;
345
346 let mut total_rows = 0usize;
347 let mut batch_num = 0usize;
348
349 while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
351 let batch_row_count = rows.len();
352 batch_num += 1;
353
354 tracing::debug!(
355 "Processing batch {} ({} rows) from table '{}'",
356 batch_num,
357 batch_row_count,
358 table
359 );
360
361 let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;
363
364 if !jsonb_rows.is_empty() {
366 crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
367 .await
368 .with_context(|| {
369 format!(
370 "Failed to insert batch {} into PostgreSQL table '{}'",
371 batch_num, table
372 )
373 })?;
374 }
375
376 total_rows += batch_row_count;
377
378 if total_rows.is_multiple_of(100_000) {
380 tracing::info!(
381 "Progress: {} rows processed from table '{}'",
382 total_rows,
383 table
384 );
385 }
386 }
387
388 tracing::info!(
389 "Completed batched conversion of table '{}': {} total rows in {} batches",
390 table,
391 total_rows,
392 batch_num
393 );
394
395 Ok(total_rows)
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use rusqlite::types::Value;
402
403 #[test]
404 fn test_convert_integer() {
405 let value = Value::Integer(42);
406 let json = sqlite_value_to_json(&value).unwrap();
407 assert_eq!(json, serde_json::json!(42));
408 }
409
410 #[test]
411 fn test_convert_real() {
412 let value = Value::Real(42.75);
413 let json = sqlite_value_to_json(&value).unwrap();
414 assert_eq!(json, serde_json::json!(42.75));
415 }
416
417 #[test]
418 fn test_convert_text() {
419 let value = Value::Text("Hello, World!".to_string());
420 let json = sqlite_value_to_json(&value).unwrap();
421 assert_eq!(json, serde_json::json!("Hello, World!"));
422 }
423
424 #[test]
425 fn test_convert_null() {
426 let value = Value::Null;
427 let json = sqlite_value_to_json(&value).unwrap();
428 assert_eq!(json, JsonValue::Null);
429 }
430
431 #[test]
432 fn test_convert_blob() {
433 let blob_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; let value = Value::Blob(blob_data.clone());
435 let json = sqlite_value_to_json(&value).unwrap();
436
437 assert!(json.is_object());
439 assert_eq!(json["_type"], "blob");
440
441 let encoded = json["data"].as_str().unwrap();
443 let decoded =
444 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, encoded).unwrap();
445 assert_eq!(decoded, blob_data);
446 }
447
448 #[test]
449 fn test_convert_non_finite_float() {
450 let nan_value = Value::Real(f64::NAN);
451 let json = sqlite_value_to_json(&nan_value).unwrap();
452 assert!(json.is_string());
454
455 let inf_value = Value::Real(f64::INFINITY);
456 let json = sqlite_value_to_json(&inf_value).unwrap();
457 assert!(json.is_string());
459 }
460
461 #[test]
462 fn test_sqlite_row_to_json() {
463 let mut row = HashMap::new();
464 row.insert("id".to_string(), Value::Integer(1));
465 row.insert("name".to_string(), Value::Text("Alice".to_string()));
466 row.insert("age".to_string(), Value::Integer(30));
467 row.insert("balance".to_string(), Value::Real(100.50));
468 row.insert("notes".to_string(), Value::Null);
469
470 let json = sqlite_row_to_json(row).unwrap();
471
472 assert_eq!(json["id"], 1);
473 assert_eq!(json["name"], "Alice");
474 assert_eq!(json["age"], 30);
475 assert_eq!(json["balance"], 100.50);
476 assert_eq!(json["notes"], JsonValue::Null);
477 }
478
479 #[test]
480 fn test_convert_table_to_jsonb() {
481 let conn = Connection::open_in_memory().unwrap();
483
484 conn.execute(
486 "CREATE TABLE users (
487 id INTEGER PRIMARY KEY,
488 name TEXT NOT NULL,
489 email TEXT,
490 age INTEGER
491 )",
492 [],
493 )
494 .unwrap();
495
496 conn.execute(
498 "INSERT INTO users (id, name, email, age) VALUES (1, 'Alice', 'alice@example.com', 30)",
499 [],
500 )
501 .unwrap();
502 conn.execute(
503 "INSERT INTO users (id, name, email, age) VALUES (2, 'Bob', 'bob@example.com', 25)",
504 [],
505 )
506 .unwrap();
507
508 let result = convert_table_to_jsonb(&conn, "users").unwrap();
510
511 assert_eq!(result.len(), 2);
512
513 let (id1, json1) = &result[0];
515 assert_eq!(id1, "1");
516 assert_eq!(json1["name"], "Alice");
517 assert_eq!(json1["email"], "alice@example.com");
518 assert_eq!(json1["age"], 30);
519
520 let (id2, json2) = &result[1];
522 assert_eq!(id2, "2");
523 assert_eq!(json2["name"], "Bob");
524 }
525
526 #[test]
527 fn test_convert_table_without_id_column() {
528 let conn = Connection::open_in_memory().unwrap();
530
531 conn.execute(
533 "CREATE TABLE logs (
534 timestamp INTEGER,
535 message TEXT
536 )",
537 [],
538 )
539 .unwrap();
540
541 conn.execute(
543 "INSERT INTO logs (timestamp, message) VALUES (12345, 'Test message')",
544 [],
545 )
546 .unwrap();
547
548 let result = convert_table_to_jsonb(&conn, "logs").unwrap();
550
551 assert_eq!(result.len(), 1);
552
553 let (id, json) = &result[0];
555 assert_eq!(id, "1");
556 assert_eq!(json["message"], "Test message");
557 }
558
559 #[test]
560 fn test_convert_table_handles_null_values() {
561 let conn = Connection::open_in_memory().unwrap();
562
563 conn.execute(
564 "CREATE TABLE users (
565 id INTEGER PRIMARY KEY,
566 name TEXT,
567 email TEXT
568 )",
569 [],
570 )
571 .unwrap();
572
573 conn.execute(
575 "INSERT INTO users (id, name, email) VALUES (1, 'Alice', NULL)",
576 [],
577 )
578 .unwrap();
579
580 let result = convert_table_to_jsonb(&conn, "users").unwrap();
581
582 assert_eq!(result.len(), 1);
583 let (_, json) = &result[0];
584 assert_eq!(json["name"], "Alice");
585 assert_eq!(json["email"], JsonValue::Null);
586 }
587
588 #[test]
589 fn test_convert_table_with_blob() {
590 let conn = Connection::open_in_memory().unwrap();
591
592 conn.execute(
593 "CREATE TABLE files (
594 id INTEGER PRIMARY KEY,
595 name TEXT,
596 data BLOB
597 )",
598 [],
599 )
600 .unwrap();
601
602 let blob_data: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
604 conn.execute(
605 "INSERT INTO files (id, name, data) VALUES (?1, ?2, ?3)",
606 rusqlite::params![1, "test.bin", &blob_data],
607 )
608 .unwrap();
609
610 let result = convert_table_to_jsonb(&conn, "files").unwrap();
611
612 assert_eq!(result.len(), 1);
613 let (_, json) = &result[0];
614 assert_eq!(json["name"], "test.bin");
615
616 assert!(json["data"].is_object());
618 assert_eq!(json["data"]["_type"], "blob");
619 assert!(json["data"]["data"].is_string());
620 }
621
622 #[test]
623 fn test_detect_id_column_case_insensitive() {
624 let conn = Connection::open_in_memory().unwrap();
625
626 conn.execute("CREATE TABLE test (ID INTEGER PRIMARY KEY, value TEXT)", [])
628 .unwrap();
629
630 let id_col = detect_id_column(&conn, "test").unwrap();
631 assert!(id_col.is_some());
632 assert_eq!(id_col.unwrap().to_lowercase(), "id");
633 }
634
635 #[test]
636 fn test_convert_empty_table() {
637 let conn = Connection::open_in_memory().unwrap();
638
639 conn.execute("CREATE TABLE empty (id INTEGER PRIMARY KEY)", [])
640 .unwrap();
641
642 let result = convert_table_to_jsonb(&conn, "empty").unwrap();
643 assert_eq!(result.len(), 0);
644 }
645}