database_replicator/jsonb/writer.rs
1// ABOUTME: Write JSONB data to PostgreSQL with metadata
2// ABOUTME: Handles table creation, single row inserts, and batch inserts
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7/// Create a table with JSONB schema for storing non-PostgreSQL data
8///
9/// Creates a table with the following structure:
10/// - id: TEXT PRIMARY KEY (original document/row ID)
11/// - data: JSONB NOT NULL (complete document/row as JSON)
12/// - _source_type: TEXT NOT NULL ('sqlite', 'mongodb', or 'mysql')
13/// - _migrated_at: TIMESTAMP NOT NULL DEFAULT NOW()
14///
15/// Also creates two indexes:
16/// - GIN index on data column for efficient JSONB queries
17/// - Index on _migrated_at for temporal queries
18///
19/// # Arguments
20///
21/// * `client` - PostgreSQL client connection
22/// * `table_name` - Name of the table to create (must be validated)
23/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
24///
25/// # Security
26///
27/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
28/// This function uses table_name in SQL directly (not parameterized) after validation.
29///
30/// # Examples
31///
32/// ```no_run
33/// # use database_replicator::jsonb::writer::create_jsonb_table;
34/// # use database_replicator::jsonb::validate_table_name;
35/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
36/// let table_name = "users";
37/// validate_table_name(table_name)?;
38/// create_jsonb_table(client, table_name, "sqlite").await?;
39/// # Ok(())
40/// # }
41/// ```
42pub async fn create_jsonb_table(
43 client: &Client,
44 table_name: &str,
45 source_type: &str,
46) -> Result<()> {
47 // Validate table name to prevent SQL injection
48 crate::jsonb::validate_table_name(table_name)
49 .context("Invalid table name for JSONB table creation")?;
50
51 tracing::info!(
52 "Creating JSONB table '{}' for source type '{}'",
53 table_name,
54 source_type
55 );
56
57 // Create table with JSONB schema
58 // Note: table_name is validated above, so it's safe to use in SQL
59 let create_table_sql = format!(
60 r#"
61 CREATE TABLE IF NOT EXISTS "{}" (
62 id TEXT PRIMARY KEY,
63 data JSONB NOT NULL,
64 _source_type TEXT NOT NULL,
65 _migrated_at TIMESTAMP NOT NULL DEFAULT NOW()
66 )
67 "#,
68 table_name
69 );
70
71 client
72 .execute(&create_table_sql, &[])
73 .await
74 .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
75
76 // Create GIN index on data column for efficient JSONB queries
77 let create_gin_index_sql = format!(
78 r#"CREATE INDEX IF NOT EXISTS "idx_{}_data" ON "{}" USING GIN (data)"#,
79 table_name, table_name
80 );
81
82 client
83 .execute(&create_gin_index_sql, &[])
84 .await
85 .with_context(|| format!("Failed to create GIN index on table '{}'", table_name))?;
86
87 // Create index on _migrated_at for temporal queries
88 let create_time_index_sql = format!(
89 r#"CREATE INDEX IF NOT EXISTS "idx_{}_migrated" ON "{}" (_migrated_at)"#,
90 table_name, table_name
91 );
92
93 client
94 .execute(&create_time_index_sql, &[])
95 .await
96 .with_context(|| {
97 format!(
98 "Failed to create _migrated_at index on table '{}'",
99 table_name
100 )
101 })?;
102
103 tracing::info!(
104 "Successfully created JSONB table '{}' with indexes",
105 table_name
106 );
107
108 Ok(())
109}
110
111/// Truncate a JSONB table to remove all existing data
112///
113/// This is used to make init idempotent - rerunning init will clear existing
114/// data before inserting fresh data from the source.
115///
116/// # Arguments
117///
118/// * `client` - PostgreSQL client connection
119/// * `table_name` - Name of the table to truncate (must be validated)
120///
121/// # Security
122///
123/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
124pub async fn truncate_jsonb_table(client: &Client, table_name: &str) -> Result<()> {
125 // Validate table name to prevent SQL injection
126 crate::jsonb::validate_table_name(table_name)
127 .context("Invalid table name for JSONB table truncation")?;
128
129 tracing::debug!("Truncating JSONB table '{}'", table_name);
130
131 let truncate_sql = format!(
132 r#"TRUNCATE TABLE "{}" RESTART IDENTITY CASCADE"#,
133 table_name
134 );
135
136 client
137 .execute(&truncate_sql, &[])
138 .await
139 .with_context(|| format!("Failed to truncate JSONB table '{}'", table_name))?;
140
141 tracing::debug!("Truncated JSONB table '{}'", table_name);
142
143 Ok(())
144}
145
146/// Insert a single JSONB row with metadata
147///
148/// Inserts a single row into a JSONB table with the original ID, data, and metadata.
149///
150/// # Arguments
151///
152/// * `client` - PostgreSQL client connection
153/// * `table_name` - Name of the table (must be validated)
154/// * `id` - Original document/row ID
155/// * `data` - Complete document/row as serde_json::Value
156/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
157///
158/// # Security
159///
160/// Uses parameterized queries for id, data, and source_type to prevent injection.
161/// table_name must be validated before calling.
162///
163/// # Examples
164///
165/// ```no_run
166/// # use database_replicator::jsonb::writer::insert_jsonb_row;
167/// # use database_replicator::jsonb::validate_table_name;
168/// # use serde_json::json;
169/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
170/// let table_name = "users";
171/// validate_table_name(table_name)?;
172/// let data = json!({"name": "Alice", "age": 30});
173/// insert_jsonb_row(client, table_name, "1", data, "sqlite").await?;
174/// # Ok(())
175/// # }
176/// ```
177pub async fn insert_jsonb_row(
178 client: &Client,
179 table_name: &str,
180 id: &str,
181 data: serde_json::Value,
182 source_type: &str,
183) -> Result<()> {
184 // Validate table name to prevent SQL injection
185 crate::jsonb::validate_table_name(table_name)
186 .context("Invalid table name for JSONB row insert")?;
187
188 // Use parameterized query for data and metadata (safe from injection)
189 // Note: table_name is validated above
190 let insert_sql = format!(
191 r#"INSERT INTO "{}" (id, data, _source_type) VALUES ($1, $2, $3)"#,
192 table_name
193 );
194
195 client
196 .execute(&insert_sql, &[&id, &data, &source_type])
197 .await
198 .with_context(|| {
199 format!(
200 "Failed to insert row with id '{}' into '{}'",
201 id, table_name
202 )
203 })?;
204
205 Ok(())
206}
207
208/// Insert multiple JSONB rows in a batch
209///
210/// Inserts multiple rows efficiently using a multi-value INSERT statement.
211/// This is significantly faster than individual inserts for large datasets.
212///
213/// # Arguments
214///
215/// * `client` - PostgreSQL client connection
216/// * `table_name` - Name of the table (must be validated)
217/// * `rows` - Vector of (id, data) tuples
218/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
219///
220/// # Security
221///
222/// Uses parameterized queries for all data. table_name must be validated.
223///
224/// # Performance
225///
226/// Batches rows into groups of 1000 to avoid PostgreSQL parameter limits.
227///
228/// # Examples
229///
230/// ```no_run
231/// # use database_replicator::jsonb::writer::insert_jsonb_batch;
232/// # use database_replicator::jsonb::validate_table_name;
233/// # use serde_json::json;
234/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
235/// let table_name = "users";
236/// validate_table_name(table_name)?;
237/// let rows = vec![
238/// ("1".to_string(), json!({"name": "Alice", "age": 30})),
239/// ("2".to_string(), json!({"name": "Bob", "age": 25})),
240/// ];
241/// insert_jsonb_batch(client, table_name, rows, "sqlite").await?;
242/// # Ok(())
243/// # }
244/// ```
245pub async fn insert_jsonb_batch(
246 client: &Client,
247 table_name: &str,
248 rows: Vec<(String, serde_json::Value)>,
249 source_type: &str,
250) -> Result<()> {
251 // Validate table name to prevent SQL injection
252 crate::jsonb::validate_table_name(table_name)
253 .context("Invalid table name for JSONB batch insert")?;
254
255 if rows.is_empty() {
256 return Ok(());
257 }
258
259 tracing::info!(
260 "Inserting {} rows into JSONB table '{}'",
261 rows.len(),
262 table_name
263 );
264
265 // Batch inserts to avoid parameter limit (PostgreSQL limit is ~65535 parameters)
266 // With 3 parameters per row (id, data, source_type), we can do ~21000 rows
267 // Use conservative 1000 rows per batch
268 const BATCH_SIZE: usize = 1000;
269
270 for (batch_num, chunk) in rows.chunks(BATCH_SIZE).enumerate() {
271 // Build parameterized multi-value INSERT
272 // Format: INSERT INTO table (cols) VALUES ($1,$2,$3),($4,$5,$6),...
273 let mut value_placeholders = Vec::with_capacity(chunk.len());
274 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
275 Vec::with_capacity(chunk.len() * 3);
276
277 for (idx, (id, data)) in chunk.iter().enumerate() {
278 let param_base = idx * 3 + 1;
279 value_placeholders.push(format!(
280 "(${}, ${}, ${})",
281 param_base,
282 param_base + 1,
283 param_base + 2
284 ));
285
286 // Add parameters in order: id, data, source_type
287 params.push(id);
288 params.push(data);
289 params.push(&source_type);
290 }
291
292 let insert_sql = format!(
293 r#"INSERT INTO "{}" (id, data, _source_type) VALUES {}"#,
294 table_name,
295 value_placeholders.join(", ")
296 );
297
298 client
299 .execute(&insert_sql, ¶ms)
300 .await
301 .with_context(|| {
302 format!(
303 "Failed to insert batch {} ({} rows) into '{}'",
304 batch_num,
305 chunk.len(),
306 table_name
307 )
308 })?;
309
310 tracing::debug!(
311 "Inserted batch {} ({} rows) into '{}'",
312 batch_num,
313 chunk.len(),
314 table_name
315 );
316 }
317
318 tracing::info!(
319 "Successfully inserted {} rows into '{}'",
320 rows.len(),
321 table_name
322 );
323
324 Ok(())
325}
326
327#[cfg(test)]
328mod tests {
329 #[test]
330 fn test_batch_insert_empty() {
331 // Empty batch should not error
332 // (actual async test requires test database)
333 }
334
335 #[test]
336 fn test_batch_size_calculation() {
337 // Verify our batch size doesn't exceed parameter limits
338 // PostgreSQL parameter limit is 65535
339 // With 3 params per row (id, data, source_type) and 1000 rows per batch:
340 // 1000 * 3 = 3000 parameters per batch, which is well under the limit
341 let batch_size = 1000_usize;
342 let params_per_row = 3_usize;
343 let total_params = batch_size * params_per_row;
344 assert!(
345 total_params < 65535,
346 "Batch size {} * {} params = {} exceeds PostgreSQL limit of 65535",
347 batch_size,
348 params_per_row,
349 total_params
350 );
351 }
352}