database_replicator/jsonb/
writer.rs

1// ABOUTME: Write JSONB data to PostgreSQL with metadata
2// ABOUTME: Handles table creation, single row inserts, and batch inserts
3
4use anyhow::{bail, Context, Result};
5use tokio_postgres::Client;
6
7/// Create a table with JSONB schema for storing non-PostgreSQL data
8///
9/// Creates a table with the following structure:
10/// - id: TEXT PRIMARY KEY (original document/row ID)
11/// - data: JSONB NOT NULL (complete document/row as JSON)
12/// - _source_type: TEXT NOT NULL ('sqlite', 'mongodb', or 'mysql')
13/// - _migrated_at: TIMESTAMP NOT NULL DEFAULT NOW()
14///
15/// Also creates two indexes:
16/// - GIN index on data column for efficient JSONB queries
17/// - Index on _migrated_at for temporal queries
18///
19/// # Arguments
20///
21/// * `client` - PostgreSQL client connection
22/// * `table_name` - Name of the table to create (must be validated)
23/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
24///
25/// # Security
26///
27/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
28/// This function uses table_name in SQL directly (not parameterized) after validation.
29///
30/// # Examples
31///
32/// ```no_run
33/// # use database_replicator::jsonb::writer::create_jsonb_table;
34/// # use database_replicator::jsonb::validate_table_name;
35/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
36/// let table_name = "users";
37/// validate_table_name(table_name)?;
38/// create_jsonb_table(client, table_name, "sqlite").await?;
39/// # Ok(())
40/// # }
41/// ```
42pub async fn create_jsonb_table(
43    client: &Client,
44    table_name: &str,
45    source_type: &str,
46) -> Result<()> {
47    // Validate table name to prevent SQL injection
48    crate::jsonb::validate_table_name(table_name)
49        .context("Invalid table name for JSONB table creation")?;
50
51    tracing::info!(
52        "Creating JSONB table '{}' for source type '{}'",
53        table_name,
54        source_type
55    );
56
57    // Create table with JSONB schema
58    // Note: table_name is validated above, so it's safe to use in SQL
59    let create_table_sql = format!(
60        r#"
61        CREATE TABLE IF NOT EXISTS "{}" (
62            id TEXT PRIMARY KEY,
63            data JSONB NOT NULL,
64            _source_type TEXT NOT NULL,
65            _migrated_at TIMESTAMP NOT NULL DEFAULT NOW()
66        )
67        "#,
68        table_name
69    );
70
71    client
72        .execute(&create_table_sql, &[])
73        .await
74        .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
75
76    // Create GIN index on data column for efficient JSONB queries
77    let create_gin_index_sql = format!(
78        r#"CREATE INDEX IF NOT EXISTS "idx_{}_data" ON "{}" USING GIN (data)"#,
79        table_name, table_name
80    );
81
82    client
83        .execute(&create_gin_index_sql, &[])
84        .await
85        .with_context(|| format!("Failed to create GIN index on table '{}'", table_name))?;
86
87    // Create index on _migrated_at for temporal queries
88    let create_time_index_sql = format!(
89        r#"CREATE INDEX IF NOT EXISTS "idx_{}_migrated" ON "{}" (_migrated_at)"#,
90        table_name, table_name
91    );
92
93    client
94        .execute(&create_time_index_sql, &[])
95        .await
96        .with_context(|| {
97            format!(
98                "Failed to create _migrated_at index on table '{}'",
99                table_name
100            )
101        })?;
102
103    tracing::info!(
104        "Successfully created JSONB table '{}' with indexes",
105        table_name
106    );
107
108    Ok(())
109}
110
111/// Truncate a JSONB table to remove all existing data
112///
113/// This is used to make init idempotent - rerunning init will clear existing
114/// data before inserting fresh data from the source.
115///
116/// # Arguments
117///
118/// * `client` - PostgreSQL client connection
119/// * `table_name` - Name of the table to truncate (must be validated)
120///
121/// # Security
122///
123/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
124pub async fn truncate_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
125    // Validate table name to prevent SQL injection
126    crate::jsonb::validate_table_name(table_name)
127        .context("Invalid table name for JSONB table truncation")?;
128
129    tracing::info!("Truncating JSONB table '{}'", table_name);
130
131    let truncate_sql = format!(
132        r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#,
133        table_name
134    );
135
136    client
137        .execute(&truncate_sql, &[])
138        .await
139        .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
140
141    let verify_sql = format!(r#"SELECT COUNT(*) FROM "{}""#, table_name);
142    let remaining_rows: i64 = client
143        .query_one(&verify_sql, &[])
144        .await
145        .with_context(|| format!("Failed to verify truncate of '{}'", table_name))?
146        .get(0);
147
148    if remaining_rows > 0 {
149        bail!(
150            "Truncate verification failed: table '{}' still has {} rows after truncate",
151            table_name,
152            remaining_rows
153        );
154    }
155
156    tracing::info!(
157        "Truncated JSONB table '{}' successfully ({} rows remaining)",
158        table_name,
159        remaining_rows
160    );
161
162    Ok(())
163}
164
165/// Drop a JSONB table if it exists.
166pub async fn drop_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
167    crate::jsonb::validate_table_name(table_name)
168        .context("Invalid table name for JSONB table drop")?;
169
170    tracing::info!("Dropping JSONB table '{}'", table_name);
171
172    let drop_sql = format!(r#"DROP TABLE IF EXISTS "{}" CASCADE"#, table_name);
173
174    client
175        .execute(&drop_sql, &[])
176        .await
177        .with_context(|| format!("Failed to drop JSONB table '{}'", table_name))?;
178
179    tracing::info!("Dropped JSONB table '{}' (if it existed)", table_name);
180
181    Ok(())
182}
183
184/// Insert a single JSONB row with metadata
185///
186/// Inserts a single row into a JSONB table with the original ID, data, and metadata.
187///
188/// # Arguments
189///
190/// * `client` - PostgreSQL client connection
191/// * `table_name` - Name of the table (must be validated)
192/// * `id` - Original document/row ID
193/// * `data` - Complete document/row as serde_json::Value
194/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
195///
196/// # Security
197///
198/// Uses parameterized queries for id, data, and source_type to prevent injection.
199/// table_name must be validated before calling.
200///
201/// # Examples
202///
203/// ```no_run
204/// # use database_replicator::jsonb::writer::insert_jsonb_row;
205/// # use database_replicator::jsonb::validate_table_name;
206/// # use serde_json::json;
207/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
208/// let table_name = "users";
209/// validate_table_name(table_name)?;
210/// let data = json!({"name": "Alice", "age": 30});
211/// insert_jsonb_row(client, table_name, "1", data, "sqlite").await?;
212/// # Ok(())
213/// # }
214/// ```
215pub async fn insert_jsonb_row(
216    client: &Client,
217    table_name: &str,
218    id: &str,
219    data: serde_json::Value,
220    source_type: &str,
221) -> Result<()> {
222    // Validate table name to prevent SQL injection
223    crate::jsonb::validate_table_name(table_name)
224        .context("Invalid table name for JSONB row insert")?;
225
226    // Use parameterized query for data and metadata (safe from injection)
227    // Note: table_name is validated above
228    let insert_sql = format!(
229        r#"INSERT INTO "{}" (id, data, _source_type) VALUES ($1, $2, $3)"#,
230        table_name
231    );
232
233    client
234        .execute(&insert_sql, &[&id, &data, &source_type])
235        .await
236        .with_context(|| {
237            format!(
238                "Failed to insert row with id '{}' into '{}'",
239                id, table_name
240            )
241        })?;
242
243    Ok(())
244}
245
246/// Estimate the serialized size of a JSONB row for batch sizing
247fn estimate_row_size(id: &str, data: &serde_json::Value) -> usize {
248    // Estimate: id length + JSON serialized size + overhead
249    id.len() + data.to_string().len() + 50 // 50 bytes overhead for metadata
250}
251
252/// Calculate optimal batch size based on row sizes
253///
254/// Targets ~10MB per batch to stay well under typical PostgreSQL limits
255/// while maintaining good throughput.
256fn calculate_batch_size(rows: &[(String, serde_json::Value)], start_idx: usize) -> usize {
257    const TARGET_BATCH_BYTES: usize = 10 * 1024 * 1024; // 10MB target
258    const MIN_BATCH_SIZE: usize = 1;
259    const MAX_BATCH_SIZE: usize = 1000;
260
261    let mut total_size = 0usize;
262    let mut count = 0usize;
263
264    for (id, data) in rows.iter().skip(start_idx) {
265        let row_size = estimate_row_size(id, data);
266        if total_size + row_size > TARGET_BATCH_BYTES && count > 0 {
267            break;
268        }
269        total_size += row_size;
270        count += 1;
271        if count >= MAX_BATCH_SIZE {
272            break;
273        }
274    }
275
276    count.max(MIN_BATCH_SIZE)
277}
278
279/// Execute a single batch insert with the given rows
280async fn execute_batch_insert(
281    client: &Client,
282    table_name: &str,
283    rows: &[(String, serde_json::Value)],
284    source_type: &str,
285) -> Result<()> {
286    // Build parameterized multi-value INSERT
287    let mut value_placeholders = Vec::with_capacity(rows.len());
288    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
289        Vec::with_capacity(rows.len() * 3);
290
291    for (idx, (id, data)) in rows.iter().enumerate() {
292        let param_base = idx * 3 + 1;
293        value_placeholders.push(format!(
294            "(${}, ${}, ${})",
295            param_base,
296            param_base + 1,
297            param_base + 2
298        ));
299        params.push(id);
300        params.push(data);
301        params.push(&source_type);
302    }
303
304    let insert_sql = format!(
305        r#"INSERT INTO "{}" (id, data, _source_type) VALUES {}"#,
306        table_name,
307        value_placeholders.join(", ")
308    );
309
310    client.execute(&insert_sql, &params).await?;
311    Ok(())
312}
313
314/// Insert multiple JSONB rows with adaptive batching
315///
316/// Inserts multiple rows efficiently using multi-value INSERT statements.
317/// Automatically adjusts batch size based on row payload sizes and retries
318/// with smaller batches on connection failures.
319///
320/// # Arguments
321///
322/// * `client` - PostgreSQL client connection
323/// * `table_name` - Name of the table (must be validated)
324/// * `rows` - Vector of (id, data) tuples
325/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
326///
327/// # Security
328///
329/// Uses parameterized queries for all data. table_name must be validated.
330///
331/// # Performance
332///
333/// - Dynamically calculates batch size based on estimated payload size
334/// - Targets ~10MB per batch for optimal throughput
335/// - Automatically retries with smaller batches on failure
336/// - Shows progress for large datasets
337///
338/// # Examples
339///
340/// ```no_run
341/// # use database_replicator::jsonb::writer::insert_jsonb_batch;
342/// # use database_replicator::jsonb::validate_table_name;
343/// # use serde_json::json;
344/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
345/// let table_name = "users";
346/// validate_table_name(table_name)?;
347/// let rows = vec![
348///     ("1".to_string(), json!({"name": "Alice", "age": 30})),
349///     ("2".to_string(), json!({"name": "Bob", "age": 25})),
350/// ];
351/// insert_jsonb_batch(client, table_name, rows, "sqlite").await?;
352/// # Ok(())
353/// # }
354/// ```
355pub async fn insert_jsonb_batch(
356    client: &Client,
357    table_name: &str,
358    rows: Vec<(String, serde_json::Value)>,
359    source_type: &str,
360) -> Result<()> {
361    // Validate table name to prevent SQL injection
362    crate::jsonb::validate_table_name(table_name)
363        .context("Invalid table name for JSONB batch insert")?;
364
365    if rows.is_empty() {
366        return Ok(());
367    }
368
369    let total_rows = rows.len();
370    tracing::info!(
371        "Inserting {} rows into JSONB table '{}'",
372        total_rows,
373        table_name
374    );
375
376    let mut inserted = 0usize;
377    let mut consecutive_failures = 0u32;
378    const MAX_RETRIES: u32 = 5;
379
380    while inserted < total_rows {
381        // Calculate optimal batch size based on remaining rows
382        let batch_size = calculate_batch_size(&rows, inserted);
383        let end_idx = (inserted + batch_size).min(total_rows);
384        let batch = &rows[inserted..end_idx];
385
386        // Log progress for large datasets
387        if total_rows > 10000 && inserted.checked_rem(50000) == Some(0) {
388            let pct = (inserted as f64 / total_rows as f64 * 100.0) as u32;
389            tracing::info!(
390                "  Progress: {}/{} rows ({}%) inserted into '{}'",
391                inserted,
392                total_rows,
393                pct,
394                table_name
395            );
396        }
397
398        match execute_batch_insert(client, table_name, batch, source_type).await {
399            Ok(()) => {
400                tracing::debug!(
401                    "Inserted batch of {} rows ({}-{}/{}) into '{}'",
402                    batch.len(),
403                    inserted,
404                    end_idx,
405                    total_rows,
406                    table_name
407                );
408                inserted = end_idx;
409                consecutive_failures = 0;
410            }
411            Err(e) => {
412                consecutive_failures += 1;
413                let is_connection_error = e.to_string().contains("connection")
414                    || e.to_string().contains("closed")
415                    || e.to_string().contains("communicating");
416
417                if is_connection_error && consecutive_failures <= MAX_RETRIES && batch.len() > 1 {
418                    // Connection error with multi-row batch - retry with smaller batches
419                    let new_batch_size = (batch.len() / 2).max(1);
420                    tracing::warn!(
421                        "Batch insert failed (attempt {}/{}), reducing batch size from {} to {} rows",
422                        consecutive_failures,
423                        MAX_RETRIES,
424                        batch.len(),
425                        new_batch_size
426                    );
427
428                    // Insert this batch row-by-row as fallback
429                    for (idx, (id, data)) in batch.iter().enumerate() {
430                        if let Err(row_err) =
431                            insert_jsonb_row(client, table_name, id, data.clone(), source_type)
432                                .await
433                        {
434                            return Err(row_err).with_context(|| {
435                                format!(
436                                    "Failed to insert row {} (id='{}') into '{}' after batch failure",
437                                    inserted + idx,
438                                    id,
439                                    table_name
440                                )
441                            });
442                        }
443                    }
444                    inserted = end_idx;
445                    consecutive_failures = 0;
446                    tracing::info!(
447                        "Successfully inserted {} rows individually after batch failure",
448                        batch.len()
449                    );
450                } else {
451                    // Non-recoverable error or too many retries
452                    return Err(e).with_context(|| {
453                        format!(
454                            "Failed to insert batch ({} rows at offset {}) into '{}'",
455                            batch.len(),
456                            inserted,
457                            table_name
458                        )
459                    });
460                }
461            }
462        }
463    }
464
465    tracing::info!(
466        "Successfully inserted {} rows into '{}'",
467        total_rows,
468        table_name
469    );
470
471    Ok(())
472}
473
474#[cfg(test)]
475mod tests {
476    #[test]
477    fn test_batch_insert_empty() {
478        // Empty batch should not error
479        // (actual async test requires test database)
480    }
481
482    #[test]
483    fn test_batch_size_calculation() {
484        // Verify our batch size doesn't exceed parameter limits
485        // PostgreSQL parameter limit is 65535
486        // With 3 params per row (id, data, source_type) and 1000 rows per batch:
487        // 1000 * 3 = 3000 parameters per batch, which is well under the limit
488        let batch_size = 1000_usize;
489        let params_per_row = 3_usize;
490        let total_params = batch_size * params_per_row;
491        assert!(
492            total_params < 65535,
493            "Batch size {} * {} params = {} exceeds PostgreSQL limit of 65535",
494            batch_size,
495            params_per_row,
496            total_params
497        );
498    }
499}