database_replicator/jsonb/
writer.rs

1// ABOUTME: Write JSONB data to PostgreSQL with metadata
2// ABOUTME: Handles table creation, single row inserts, and batch inserts
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7/// Create a table with JSONB schema for storing non-PostgreSQL data
8///
9/// Creates a table with the following structure:
10/// - id: TEXT PRIMARY KEY (original document/row ID)
11/// - data: JSONB NOT NULL (complete document/row as JSON)
12/// - _source_type: TEXT NOT NULL ('sqlite', 'mongodb', or 'mysql')
13/// - _migrated_at: TIMESTAMP NOT NULL DEFAULT NOW()
14///
15/// Also creates two indexes:
16/// - GIN index on data column for efficient JSONB queries
17/// - Index on _migrated_at for temporal queries
18///
19/// # Arguments
20///
21/// * `client` - PostgreSQL client connection
22/// * `table_name` - Name of the table to create (must be validated)
23/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
24///
25/// # Security
26///
27/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
28/// This function uses table_name in SQL directly (not parameterized) after validation.
29///
30/// # Examples
31///
32/// ```no_run
33/// # use database_replicator::jsonb::writer::create_jsonb_table;
34/// # use database_replicator::jsonb::validate_table_name;
35/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
36/// let table_name = "users";
37/// validate_table_name(table_name)?;
38/// create_jsonb_table(client, table_name, "sqlite").await?;
39/// # Ok(())
40/// # }
41/// ```
42pub async fn create_jsonb_table(
43    client: &Client,
44    table_name: &str,
45    source_type: &str,
46) -> Result<()> {
47    // Validate table name to prevent SQL injection
48    crate::jsonb::validate_table_name(table_name)
49        .context("Invalid table name for JSONB table creation")?;
50
51    tracing::info!(
52        "Creating JSONB table '{}' for source type '{}'",
53        table_name,
54        source_type
55    );
56
57    // Create table with JSONB schema
58    // Note: table_name is validated above, so it's safe to use in SQL
59    let create_table_sql = format!(
60        r#"
61        CREATE TABLE IF NOT EXISTS "{}" (
62            id TEXT PRIMARY KEY,
63            data JSONB NOT NULL,
64            _source_type TEXT NOT NULL,
65            _migrated_at TIMESTAMP NOT NULL DEFAULT NOW()
66        )
67        "#,
68        table_name
69    );
70
71    client
72        .execute(&create_table_sql, &[])
73        .await
74        .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
75
76    // Create GIN index on data column for efficient JSONB queries
77    let create_gin_index_sql = format!(
78        r#"CREATE INDEX IF NOT EXISTS "idx_{}_data" ON "{}" USING GIN (data)"#,
79        table_name, table_name
80    );
81
82    client
83        .execute(&create_gin_index_sql, &[])
84        .await
85        .with_context(|| format!("Failed to create GIN index on table '{}'", table_name))?;
86
87    // Create index on _migrated_at for temporal queries
88    let create_time_index_sql = format!(
89        r#"CREATE INDEX IF NOT EXISTS "idx_{}_migrated" ON "{}" (_migrated_at)"#,
90        table_name, table_name
91    );
92
93    client
94        .execute(&create_time_index_sql, &[])
95        .await
96        .with_context(|| {
97            format!(
98                "Failed to create _migrated_at index on table '{}'",
99                table_name
100            )
101        })?;
102
103    tracing::info!(
104        "Successfully created JSONB table '{}' with indexes",
105        table_name
106    );
107
108    Ok(())
109}
110
111/// Truncate a JSONB table to remove all existing data
112///
113/// This is used to make init idempotent - rerunning init will clear existing
114/// data before inserting fresh data from the source.
115///
116/// # Arguments
117///
118/// * `client` - PostgreSQL client connection
119/// * `table_name` - Name of the table to truncate (must be validated)
120///
121/// # Security
122///
123/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
124pub async fn truncate_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
125    // Validate table name to prevent SQL injection
126    crate::jsonb::validate_table_name(table_name)
127        .context("Invalid table name for JSONB table truncation")?;
128
129    tracing::debug!("Truncating JSONB table '{}'", table_name);
130
131    let truncate_sql = format!(
132        r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#,
133        table_name
134    );
135
136    client
137        .execute(&truncate_sql, &[])
138        .await
139        .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
140
141    tracing::debug!("Truncated JSONB table '{}'", table_name);
142
143    Ok(())
144}
145
146/// Insert a single JSONB row with metadata
147///
148/// Inserts a single row into a JSONB table with the original ID, data, and metadata.
149///
150/// # Arguments
151///
152/// * `client` - PostgreSQL client connection
153/// * `table_name` - Name of the table (must be validated)
154/// * `id` - Original document/row ID
155/// * `data` - Complete document/row as serde_json::Value
156/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
157///
158/// # Security
159///
160/// Uses parameterized queries for id, data, and source_type to prevent injection.
161/// table_name must be validated before calling.
162///
163/// # Examples
164///
165/// ```no_run
166/// # use database_replicator::jsonb::writer::insert_jsonb_row;
167/// # use database_replicator::jsonb::validate_table_name;
168/// # use serde_json::json;
169/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
170/// let table_name = "users";
171/// validate_table_name(table_name)?;
172/// let data = json!({"name": "Alice", "age": 30});
173/// insert_jsonb_row(client, table_name, "1", data, "sqlite").await?;
174/// # Ok(())
175/// # }
176/// ```
177pub async fn insert_jsonb_row(
178    client: &Client,
179    table_name: &str,
180    id: &str,
181    data: serde_json::Value,
182    source_type: &str,
183) -> Result<()> {
184    // Validate table name to prevent SQL injection
185    crate::jsonb::validate_table_name(table_name)
186        .context("Invalid table name for JSONB row insert")?;
187
188    // Use parameterized query for data and metadata (safe from injection)
189    // Note: table_name is validated above
190    let insert_sql = format!(
191        r#"INSERT INTO "{}" (id, data, _source_type) VALUES ($1, $2, $3)"#,
192        table_name
193    );
194
195    client
196        .execute(&insert_sql, &[&id, &data, &source_type])
197        .await
198        .with_context(|| {
199            format!(
200                "Failed to insert row with id '{}' into '{}'",
201                id, table_name
202            )
203        })?;
204
205    Ok(())
206}
207
208/// Estimate the serialized size of a JSONB row for batch sizing
209fn estimate_row_size(id: &str, data: &serde_json::Value) -> usize {
210    // Estimate: id length + JSON serialized size + overhead
211    id.len() + data.to_string().len() + 50 // 50 bytes overhead for metadata
212}
213
214/// Calculate optimal batch size based on row sizes
215///
216/// Targets ~10MB per batch to stay well under typical PostgreSQL limits
217/// while maintaining good throughput.
218fn calculate_batch_size(rows: &[(String, serde_json::Value)], start_idx: usize) -> usize {
219    const TARGET_BATCH_BYTES: usize = 10 * 1024 * 1024; // 10MB target
220    const MIN_BATCH_SIZE: usize = 1;
221    const MAX_BATCH_SIZE: usize = 1000;
222
223    let mut total_size = 0usize;
224    let mut count = 0usize;
225
226    for (id, data) in rows.iter().skip(start_idx) {
227        let row_size = estimate_row_size(id, data);
228        if total_size + row_size > TARGET_BATCH_BYTES && count > 0 {
229            break;
230        }
231        total_size += row_size;
232        count += 1;
233        if count >= MAX_BATCH_SIZE {
234            break;
235        }
236    }
237
238    count.max(MIN_BATCH_SIZE)
239}
240
241/// Execute a single batch insert with the given rows
242async fn execute_batch_insert(
243    client: &Client,
244    table_name: &str,
245    rows: &[(String, serde_json::Value)],
246    source_type: &str,
247) -> Result<()> {
248    // Build parameterized multi-value INSERT
249    let mut value_placeholders = Vec::with_capacity(rows.len());
250    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
251        Vec::with_capacity(rows.len() * 3);
252
253    for (idx, (id, data)) in rows.iter().enumerate() {
254        let param_base = idx * 3 + 1;
255        value_placeholders.push(format!(
256            "(${}, ${}, ${})",
257            param_base,
258            param_base + 1,
259            param_base + 2
260        ));
261        params.push(id);
262        params.push(data);
263        params.push(&source_type);
264    }
265
266    let insert_sql = format!(
267        r#"INSERT INTO "{}" (id, data, _source_type) VALUES {}"#,
268        table_name,
269        value_placeholders.join(", ")
270    );
271
272    client.execute(&insert_sql, &params).await?;
273    Ok(())
274}
275
276/// Insert multiple JSONB rows with adaptive batching
277///
278/// Inserts multiple rows efficiently using multi-value INSERT statements.
279/// Automatically adjusts batch size based on row payload sizes and retries
280/// with smaller batches on connection failures.
281///
282/// # Arguments
283///
284/// * `client` - PostgreSQL client connection
285/// * `table_name` - Name of the table (must be validated)
286/// * `rows` - Vector of (id, data) tuples
287/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
288///
289/// # Security
290///
291/// Uses parameterized queries for all data. table_name must be validated.
292///
293/// # Performance
294///
295/// - Dynamically calculates batch size based on estimated payload size
296/// - Targets ~10MB per batch for optimal throughput
297/// - Automatically retries with smaller batches on failure
298/// - Shows progress for large datasets
299///
300/// # Examples
301///
302/// ```no_run
303/// # use database_replicator::jsonb::writer::insert_jsonb_batch;
304/// # use database_replicator::jsonb::validate_table_name;
305/// # use serde_json::json;
306/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
307/// let table_name = "users";
308/// validate_table_name(table_name)?;
309/// let rows = vec![
310///     ("1".to_string(), json!({"name": "Alice", "age": 30})),
311///     ("2".to_string(), json!({"name": "Bob", "age": 25})),
312/// ];
313/// insert_jsonb_batch(client, table_name, rows, "sqlite").await?;
314/// # Ok(())
315/// # }
316/// ```
317pub async fn insert_jsonb_batch(
318    client: &Client,
319    table_name: &str,
320    rows: Vec<(String, serde_json::Value)>,
321    source_type: &str,
322) -> Result<()> {
323    // Validate table name to prevent SQL injection
324    crate::jsonb::validate_table_name(table_name)
325        .context("Invalid table name for JSONB batch insert")?;
326
327    if rows.is_empty() {
328        return Ok(());
329    }
330
331    let total_rows = rows.len();
332    tracing::info!(
333        "Inserting {} rows into JSONB table '{}'",
334        total_rows,
335        table_name
336    );
337
338    let mut inserted = 0usize;
339    let mut consecutive_failures = 0u32;
340    const MAX_RETRIES: u32 = 5;
341
342    while inserted < total_rows {
343        // Calculate optimal batch size based on remaining rows
344        let batch_size = calculate_batch_size(&rows, inserted);
345        let end_idx = (inserted + batch_size).min(total_rows);
346        let batch = &rows[inserted..end_idx];
347
348        // Log progress for large datasets
349        if total_rows > 10000 && inserted.checked_rem(50000) == Some(0) {
350            let pct = (inserted as f64 / total_rows as f64 * 100.0) as u32;
351            tracing::info!(
352                "  Progress: {}/{} rows ({}%) inserted into '{}'",
353                inserted,
354                total_rows,
355                pct,
356                table_name
357            );
358        }
359
360        match execute_batch_insert(client, table_name, batch, source_type).await {
361            Ok(()) => {
362                tracing::debug!(
363                    "Inserted batch of {} rows ({}-{}/{}) into '{}'",
364                    batch.len(),
365                    inserted,
366                    end_idx,
367                    total_rows,
368                    table_name
369                );
370                inserted = end_idx;
371                consecutive_failures = 0;
372            }
373            Err(e) => {
374                consecutive_failures += 1;
375                let is_connection_error = e.to_string().contains("connection")
376                    || e.to_string().contains("closed")
377                    || e.to_string().contains("communicating");
378
379                if is_connection_error && consecutive_failures <= MAX_RETRIES && batch.len() > 1 {
380                    // Connection error with multi-row batch - retry with smaller batches
381                    let new_batch_size = (batch.len() / 2).max(1);
382                    tracing::warn!(
383                        "Batch insert failed (attempt {}/{}), reducing batch size from {} to {} rows",
384                        consecutive_failures,
385                        MAX_RETRIES,
386                        batch.len(),
387                        new_batch_size
388                    );
389
390                    // Insert this batch row-by-row as fallback
391                    for (idx, (id, data)) in batch.iter().enumerate() {
392                        if let Err(row_err) =
393                            insert_jsonb_row(client, table_name, id, data.clone(), source_type)
394                                .await
395                        {
396                            return Err(row_err).with_context(|| {
397                                format!(
398                                    "Failed to insert row {} (id='{}') into '{}' after batch failure",
399                                    inserted + idx,
400                                    id,
401                                    table_name
402                                )
403                            });
404                        }
405                    }
406                    inserted = end_idx;
407                    consecutive_failures = 0;
408                    tracing::info!(
409                        "Successfully inserted {} rows individually after batch failure",
410                        batch.len()
411                    );
412                } else {
413                    // Non-recoverable error or too many retries
414                    return Err(e).with_context(|| {
415                        format!(
416                            "Failed to insert batch ({} rows at offset {}) into '{}'",
417                            batch.len(),
418                            inserted,
419                            table_name
420                        )
421                    });
422                }
423            }
424        }
425    }
426
427    tracing::info!(
428        "Successfully inserted {} rows into '{}'",
429        total_rows,
430        table_name
431    );
432
433    Ok(())
434}
435
436#[cfg(test)]
437mod tests {
438    #[test]
439    fn test_batch_insert_empty() {
440        // Empty batch should not error
441        // (actual async test requires test database)
442    }
443
444    #[test]
445    fn test_batch_size_calculation() {
446        // Verify our batch size doesn't exceed parameter limits
447        // PostgreSQL parameter limit is 65535
448        // With 3 params per row (id, data, source_type) and 1000 rows per batch:
449        // 1000 * 3 = 3000 parameters per batch, which is well under the limit
450        let batch_size = 1000_usize;
451        let params_per_row = 3_usize;
452        let total_params = batch_size * params_per_row;
453        assert!(
454            total_params < 65535,
455            "Batch size {} * {} params = {} exceeds PostgreSQL limit of 65535",
456            batch_size,
457            params_per_row,
458            total_params
459        );
460    }
461}