database_replicator/jsonb/writer.rs
1// ABOUTME: Write JSONB data to PostgreSQL with metadata
2// ABOUTME: Handles table creation, single row inserts, and batch inserts
3
4use anyhow::{bail, Context, Result};
5use tokio_postgres::Client;
6
7/// Create a table with JSONB schema for storing non-PostgreSQL data
8///
9/// Creates a table with the following structure:
10/// - id: TEXT PRIMARY KEY (original document/row ID)
11/// - data: JSONB NOT NULL (complete document/row as JSON)
12/// - _source_type: TEXT NOT NULL ('sqlite', 'mongodb', or 'mysql')
13/// - _migrated_at: TIMESTAMP NOT NULL DEFAULT NOW()
14///
15/// Also creates two indexes:
16/// - GIN index on data column for efficient JSONB queries
17/// - Index on _migrated_at for temporal queries
18///
19/// # Arguments
20///
21/// * `client` - PostgreSQL client connection
22/// * `table_name` - Name of the table to create (must be validated)
23/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
24///
25/// # Security
26///
27/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
28/// This function uses table_name in SQL directly (not parameterized) after validation.
29///
30/// # Examples
31///
32/// ```no_run
33/// # use database_replicator::jsonb::writer::create_jsonb_table;
34/// # use database_replicator::jsonb::validate_table_name;
35/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
36/// let table_name = "users";
37/// validate_table_name(table_name)?;
38/// create_jsonb_table(client, table_name, "sqlite").await?;
39/// # Ok(())
40/// # }
41/// ```
42pub async fn create_jsonb_table(
43 client: &Client,
44 table_name: &str,
45 source_type: &str,
46) -> Result<()> {
47 // Validate table name to prevent SQL injection
48 crate::jsonb::validate_table_name(table_name)
49 .context("Invalid table name for JSONB table creation")?;
50
51 tracing::info!(
52 "Creating JSONB table '{}' for source type '{}'",
53 table_name,
54 source_type
55 );
56
57 // Create table with JSONB schema
58 // Note: table_name is validated above, so it's safe to use in SQL
59 let create_table_sql = format!(
60 r#"
61 CREATE TABLE IF NOT EXISTS "{}" (
62 id TEXT PRIMARY KEY,
63 data JSONB NOT NULL,
64 _source_type TEXT NOT NULL,
65 _migrated_at TIMESTAMP NOT NULL DEFAULT NOW()
66 )
67 "#,
68 table_name
69 );
70
71 client
72 .execute(&create_table_sql, &[])
73 .await
74 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
75
76 // Create GIN index on data column for efficient JSONB queries
77 let create_gin_index_sql = format!(
78 r#"CREATE INDEX IF NOT EXISTS "idx_{}_data" ON "{}" USING GIN (data)"#,
79 table_name, table_name
80 );
81
82 client
83 .execute(&create_gin_index_sql, &[])
84 .await
85 .with_context(|| format!("Failed to create GIN index on table '{}'", table_name))?;
86
87 // Create index on _migrated_at for temporal queries
88 let create_time_index_sql = format!(
89 r#"CREATE INDEX IF NOT EXISTS "idx_{}_migrated" ON "{}" (_migrated_at)"#,
90 table_name, table_name
91 );
92
93 client
94 .execute(&create_time_index_sql, &[])
95 .await
96 .with_context(|| {
97 format!(
98 "Failed to create _migrated_at index on table '{}'",
99 table_name
100 )
101 })?;
102
103 tracing::info!(
104 "Successfully created JSONB table '{}' with indexes",
105 table_name
106 );
107
108 Ok(())
109}
110
111/// Truncate a JSONB table to remove all existing data
112///
113/// This is used to make init idempotent - rerunning init will clear existing
114/// data before inserting fresh data from the source.
115///
116/// # Arguments
117///
118/// * `client` - PostgreSQL client connection
119/// * `table_name` - Name of the table to truncate (must be validated)
120///
121/// # Security
122///
123/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
124pub async fn truncate_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
125 // Validate table name to prevent SQL injection
126 crate::jsonb::validate_table_name(table_name)
127 .context("Invalid table name for JSONB table truncation")?;
128
129 tracing::info!("Truncating JSONB table '{}'", table_name);
130
131 let truncate_sql = format!(
132 r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#,
133 table_name
134 );
135
136 client
137 .execute(&truncate_sql, &[])
138 .await
139 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
140
141 let verify_sql = format!(r#"SELECT COUNT(*) FROM "{}""#, table_name);
142 let remaining_rows: i64 = client
143 .query_one(&verify_sql, &[])
144 .await
145 .with_context(|| format!("Failed to verify truncate of '{}'", table_name))?
146 .get(0);
147
148 if remaining_rows > 0 {
149 bail!(
150 "Truncate verification failed: table '{}' still has {} rows after truncate",
151 table_name,
152 remaining_rows
153 );
154 }
155
156 tracing::info!(
157 "Truncated JSONB table '{}' successfully ({} rows remaining)",
158 table_name,
159 remaining_rows
160 );
161
162 Ok(())
163}
164
165/// Drop a JSONB table if it exists.
166pub async fn drop_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
167 crate::jsonb::validate_table_name(table_name)
168 .context("Invalid table name for JSONB table drop")?;
169
170 tracing::info!("Dropping JSONB table '{}'", table_name);
171
172 let drop_sql = format!(r#"DROP TABLE IF EXISTS "{}" CASCADE"#, table_name);
173
174 client
175 .execute(&drop_sql, &[])
176 .await
177 .with_context(|| format!("Failed to drop JSONB table '{}'", table_name))?;
178
179 tracing::info!("Dropped JSONB table '{}' (if it existed)", table_name);
180
181 Ok(())
182}
183
184/// Insert a single JSONB row with metadata
185///
186/// Inserts a single row into a JSONB table with the original ID, data, and metadata.
187///
188/// # Arguments
189///
190/// * `client` - PostgreSQL client connection
191/// * `table_name` - Name of the table (must be validated)
192/// * `id` - Original document/row ID
193/// * `data` - Complete document/row as serde_json::Value
194/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
195///
196/// # Security
197///
198/// Uses parameterized queries for id, data, and source_type to prevent injection.
199/// table_name must be validated before calling.
200///
201/// # Examples
202///
203/// ```no_run
204/// # use database_replicator::jsonb::writer::insert_jsonb_row;
205/// # use database_replicator::jsonb::validate_table_name;
206/// # use serde_json::json;
207/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
208/// let table_name = "users";
209/// validate_table_name(table_name)?;
210/// let data = json!({"name": "Alice", "age": 30});
211/// insert_jsonb_row(client, table_name, "1", data, "sqlite").await?;
212/// # Ok(())
213/// # }
214/// ```
215pub async fn insert_jsonb_row(
216 client: &Client,
217 table_name: &str,
218 id: &str,
219 data: serde_json::Value,
220 source_type: &str,
221) -> Result<()> {
222 // Validate table name to prevent SQL injection
223 crate::jsonb::validate_table_name(table_name)
224 .context("Invalid table name for JSONB row insert")?;
225
226 // Use parameterized query for data and metadata (safe from injection)
227 // Note: table_name is validated above
228 let insert_sql = format!(
229 r#"INSERT INTO "{}" (id, data, _source_type) VALUES ($1, $2, $3)"#,
230 table_name
231 );
232
233 client
234 .execute(&insert_sql, &[&id, &data, &source_type])
235 .await
236 .with_context(|| {
237 format!(
238 "Failed to insert row with id '{}' into '{}'",
239 id, table_name
240 )
241 })?;
242
243 Ok(())
244}
245
246/// Estimate the serialized size of a JSONB row for batch sizing
247fn estimate_row_size(id: &str, data: &serde_json::Value) -> usize {
248 // Estimate: id length + JSON serialized size + overhead
249 id.len() + data.to_string().len() + 50 // 50 bytes overhead for metadata
250}
251
252/// Calculate optimal batch size based on row sizes
253///
254/// Targets ~10MB per batch to stay well under typical PostgreSQL limits
255/// while maintaining good throughput.
256fn calculate_batch_size(rows: &[(String, serde_json::Value)], start_idx: usize) -> usize {
257 const TARGET_BATCH_BYTES: usize = 10 * 1024 * 1024; // 10MB target
258 const MIN_BATCH_SIZE: usize = 1;
259 const MAX_BATCH_SIZE: usize = 1000;
260
261 let mut total_size = 0usize;
262 let mut count = 0usize;
263
264 for (id, data) in rows.iter().skip(start_idx) {
265 let row_size = estimate_row_size(id, data);
266 if total_size + row_size > TARGET_BATCH_BYTES && count > 0 {
267 break;
268 }
269 total_size += row_size;
270 count += 1;
271 if count >= MAX_BATCH_SIZE {
272 break;
273 }
274 }
275
276 count.max(MIN_BATCH_SIZE)
277}
278
279/// Execute a single batch insert with the given rows
280async fn execute_batch_insert(
281 client: &Client,
282 table_name: &str,
283 rows: &[(String, serde_json::Value)],
284 source_type: &str,
285) -> Result<()> {
286 // Build parameterized multi-value INSERT
287 let mut value_placeholders = Vec::with_capacity(rows.len());
288 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
289 Vec::with_capacity(rows.len() * 3);
290
291 for (idx, (id, data)) in rows.iter().enumerate() {
292 let param_base = idx * 3 + 1;
293 value_placeholders.push(format!(
294 "(${}, ${}, ${})",
295 param_base,
296 param_base + 1,
297 param_base + 2
298 ));
299 params.push(id);
300 params.push(data);
301 params.push(&source_type);
302 }
303
304 let insert_sql = format!(
305 r#"INSERT INTO "{}" (id, data, _source_type) VALUES {}"#,
306 table_name,
307 value_placeholders.join(", ")
308 );
309
310 client.execute(&insert_sql, ¶ms).await?;
311 Ok(())
312}
313
314/// Insert multiple JSONB rows with adaptive batching
315///
316/// Inserts multiple rows efficiently using multi-value INSERT statements.
317/// Automatically adjusts batch size based on row payload sizes and retries
318/// with smaller batches on connection failures.
319///
320/// # Arguments
321///
322/// * `client` - PostgreSQL client connection
323/// * `table_name` - Name of the table (must be validated)
324/// * `rows` - Vector of (id, data) tuples
325/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
326///
327/// # Security
328///
329/// Uses parameterized queries for all data. table_name must be validated.
330///
331/// # Performance
332///
333/// - Dynamically calculates batch size based on estimated payload size
334/// - Targets ~10MB per batch for optimal throughput
335/// - Automatically retries with smaller batches on failure
336/// - Shows progress for large datasets
337///
338/// # Examples
339///
340/// ```no_run
341/// # use database_replicator::jsonb::writer::insert_jsonb_batch;
342/// # use database_replicator::jsonb::validate_table_name;
343/// # use serde_json::json;
344/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
345/// let table_name = "users";
346/// validate_table_name(table_name)?;
347/// let rows = vec![
348/// ("1".to_string(), json!({"name": "Alice", "age": 30})),
349/// ("2".to_string(), json!({"name": "Bob", "age": 25})),
350/// ];
351/// insert_jsonb_batch(client, table_name, rows, "sqlite").await?;
352/// # Ok(())
353/// # }
354/// ```
355pub async fn insert_jsonb_batch(
356 client: &Client,
357 table_name: &str,
358 rows: Vec<(String, serde_json::Value)>,
359 source_type: &str,
360) -> Result<()> {
361 // Validate table name to prevent SQL injection
362 crate::jsonb::validate_table_name(table_name)
363 .context("Invalid table name for JSONB batch insert")?;
364
365 if rows.is_empty() {
366 return Ok(());
367 }
368
369 let total_rows = rows.len();
370 tracing::info!(
371 "Inserting {} rows into JSONB table '{}'",
372 total_rows,
373 table_name
374 );
375
376 let mut inserted = 0usize;
377 let mut consecutive_failures = 0u32;
378 const MAX_RETRIES: u32 = 5;
379
380 while inserted < total_rows {
381 // Calculate optimal batch size based on remaining rows
382 let batch_size = calculate_batch_size(&rows, inserted);
383 let end_idx = (inserted + batch_size).min(total_rows);
384 let batch = &rows[inserted..end_idx];
385
386 // Log progress for large datasets
387 if total_rows > 10000 && inserted.checked_rem(50000) == Some(0) {
388 let pct = (inserted as f64 / total_rows as f64 * 100.0) as u32;
389 tracing::info!(
390 " Progress: {}/{} rows ({}%) inserted into '{}'",
391 inserted,
392 total_rows,
393 pct,
394 table_name
395 );
396 }
397
398 match execute_batch_insert(client, table_name, batch, source_type).await {
399 Ok(()) => {
400 tracing::debug!(
401 "Inserted batch of {} rows ({}-{}/{}) into '{}'",
402 batch.len(),
403 inserted,
404 end_idx,
405 total_rows,
406 table_name
407 );
408 inserted = end_idx;
409 consecutive_failures = 0;
410 }
411 Err(e) => {
412 consecutive_failures += 1;
413 let is_connection_error = e.to_string().contains("connection")
414 || e.to_string().contains("closed")
415 || e.to_string().contains("communicating");
416
417 if is_connection_error && consecutive_failures <= MAX_RETRIES && batch.len() > 1 {
418 // Connection error with multi-row batch - retry with smaller batches
419 let new_batch_size = (batch.len() / 2).max(1);
420 tracing::warn!(
421 "Batch insert failed (attempt {}/{}), reducing batch size from {} to {} rows",
422 consecutive_failures,
423 MAX_RETRIES,
424 batch.len(),
425 new_batch_size
426 );
427
428 // Insert this batch row-by-row as fallback
429 for (idx, (id, data)) in batch.iter().enumerate() {
430 if let Err(row_err) =
431 insert_jsonb_row(client, table_name, id, data.clone(), source_type)
432 .await
433 {
434 return Err(row_err).with_context(|| {
435 format!(
436 "Failed to insert row {} (id='{}') into '{}' after batch failure",
437 inserted + idx,
438 id,
439 table_name
440 )
441 });
442 }
443 }
444 inserted = end_idx;
445 consecutive_failures = 0;
446 tracing::info!(
447 "Successfully inserted {} rows individually after batch failure",
448 batch.len()
449 );
450 } else {
451 // Non-recoverable error or too many retries
452 return Err(e).with_context(|| {
453 format!(
454 "Failed to insert batch ({} rows at offset {}) into '{}'",
455 batch.len(),
456 inserted,
457 table_name
458 )
459 });
460 }
461 }
462 }
463 }
464
465 tracing::info!(
466 "Successfully inserted {} rows into '{}'",
467 total_rows,
468 table_name
469 );
470
471 Ok(())
472}
473
474#[cfg(test)]
475mod tests {
476 #[test]
477 fn test_batch_insert_empty() {
478 // Empty batch should not error
479 // (actual async test requires test database)
480 }
481
482 #[test]
483 fn test_batch_size_calculation() {
484 // Verify our batch size doesn't exceed parameter limits
485 // PostgreSQL parameter limit is 65535
486 // With 3 params per row (id, data, source_type) and 1000 rows per batch:
487 // 1000 * 3 = 3000 parameters per batch, which is well under the limit
488 let batch_size = 1000_usize;
489 let params_per_row = 3_usize;
490 let total_params = batch_size * params_per_row;
491 assert!(
492 total_params < 65535,
493 "Batch size {} * {} params = {} exceeds PostgreSQL limit of 65535",
494 batch_size,
495 params_per_row,
496 total_params
497 );
498 }
499}