database_replicator/jsonb/
writer.rs

1// ABOUTME: Write JSONB data to PostgreSQL with metadata
2// ABOUTME: Handles table creation, single row inserts, and batch inserts
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7/// Create a table with JSONB schema for storing non-PostgreSQL data
8///
9/// Creates a table with the following structure:
10/// - id: TEXT PRIMARY KEY (original document/row ID)
11/// - data: JSONB NOT NULL (complete document/row as JSON)
12/// - _source_type: TEXT NOT NULL ('sqlite', 'mongodb', or 'mysql')
13/// - _migrated_at: TIMESTAMP NOT NULL DEFAULT NOW()
14///
15/// Also creates two indexes:
16/// - GIN index on data column for efficient JSONB queries
17/// - Index on _migrated_at for temporal queries
18///
19/// # Arguments
20///
21/// * `client` - PostgreSQL client connection
22/// * `table_name` - Name of the table to create (must be validated)
23/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
24///
25/// # Security
26///
27/// CRITICAL: table_name MUST be validated with validate_table_name() before calling.
28/// This function uses table_name in SQL directly (not parameterized) after validation.
29///
30/// # Examples
31///
32/// ```no_run
33/// # use database_replicator::jsonb::writer::create_jsonb_table;
34/// # use database_replicator::jsonb::validate_table_name;
35/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
36/// let table_name = "users";
37/// validate_table_name(table_name)?;
38/// create_jsonb_table(client, table_name, "sqlite").await?;
39/// # Ok(())
40/// # }
41/// ```
42pub async fn create_jsonb_table(
43    client: &Client,
44    table_name: &str,
45    source_type: &str,
46) -> Result<()> {
47    // Validate table name to prevent SQL injection
48    crate::jsonb::validate_table_name(table_name)
49        .context("Invalid table name for JSONB table creation")?;
50
51    tracing::info!(
52        "Creating JSONB table '{}' for source type '{}'",
53        table_name,
54        source_type
55    );
56
57    // Create table with JSONB schema
58    // Note: table_name is validated above, so it's safe to use in SQL
59    let create_table_sql = format!(
60        r#"
61        CREATE TABLE IF NOT EXISTS "{}" (
62            id TEXT PRIMARY KEY,
63            data JSONB NOT NULL,
64            _source_type TEXT NOT NULL,
65            _migrated_at TIMESTAMP NOT NULL DEFAULT NOW()
66        )
67        "#,
68        table_name
69    );
70
71    client
72        .execute(&create_table_sql, &[])
73        .await
74        .with_context(|| format!("Failed to create JSONB table '{}'", table_name))?;
75
76    // Create GIN index on data column for efficient JSONB queries
77    let create_gin_index_sql = format!(
78        r#"CREATE INDEX IF NOT EXISTS "idx_{}_data" ON "{}" USING GIN (data)"#,
79        table_name, table_name
80    );
81
82    client
83        .execute(&create_gin_index_sql, &[])
84        .await
85        .with_context(|| format!("Failed to create GIN index on table '{}'", table_name))?;
86
87    // Create index on _migrated_at for temporal queries
88    let create_time_index_sql = format!(
89        r#"CREATE INDEX IF NOT EXISTS "idx_{}_migrated" ON "{}" (_migrated_at)"#,
90        table_name, table_name
91    );
92
93    client
94        .execute(&create_time_index_sql, &[])
95        .await
96        .with_context(|| {
97            format!(
98                "Failed to create _migrated_at index on table '{}'",
99                table_name
100            )
101        })?;
102
103    tracing::info!(
104        "Successfully created JSONB table '{}' with indexes",
105        table_name
106    );
107
108    Ok(())
109}
110
111/// Insert a single JSONB row with metadata
112///
113/// Inserts a single row into a JSONB table with the original ID, data, and metadata.
114///
115/// # Arguments
116///
117/// * `client` - PostgreSQL client connection
118/// * `table_name` - Name of the table (must be validated)
119/// * `id` - Original document/row ID
120/// * `data` - Complete document/row as serde_json::Value
121/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
122///
123/// # Security
124///
125/// Uses parameterized queries for id, data, and source_type to prevent injection.
126/// table_name must be validated before calling.
127///
128/// # Examples
129///
130/// ```no_run
131/// # use database_replicator::jsonb::writer::insert_jsonb_row;
132/// # use database_replicator::jsonb::validate_table_name;
133/// # use serde_json::json;
134/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
135/// let table_name = "users";
136/// validate_table_name(table_name)?;
137/// let data = json!({"name": "Alice", "age": 30});
138/// insert_jsonb_row(client, table_name, "1", data, "sqlite").await?;
139/// # Ok(())
140/// # }
141/// ```
142pub async fn insert_jsonb_row(
143    client: &Client,
144    table_name: &str,
145    id: &str,
146    data: serde_json::Value,
147    source_type: &str,
148) -> Result<()> {
149    // Validate table name to prevent SQL injection
150    crate::jsonb::validate_table_name(table_name)
151        .context("Invalid table name for JSONB row insert")?;
152
153    // Use parameterized query for data and metadata (safe from injection)
154    // Note: table_name is validated above
155    let insert_sql = format!(
156        r#"INSERT INTO "{}" (id, data, _source_type) VALUES ($1, $2, $3)"#,
157        table_name
158    );
159
160    client
161        .execute(&insert_sql, &[&id, &data, &source_type])
162        .await
163        .with_context(|| {
164            format!(
165                "Failed to insert row with id '{}' into '{}'",
166                id, table_name
167            )
168        })?;
169
170    Ok(())
171}
172
173/// Insert multiple JSONB rows in a batch
174///
175/// Inserts multiple rows efficiently using a multi-value INSERT statement.
176/// This is significantly faster than individual inserts for large datasets.
177///
178/// # Arguments
179///
180/// * `client` - PostgreSQL client connection
181/// * `table_name` - Name of the table (must be validated)
182/// * `rows` - Vector of (id, data) tuples
183/// * `source_type` - Source database type ('sqlite', 'mongodb', or 'mysql')
184///
185/// # Security
186///
187/// Uses parameterized queries for all data. table_name must be validated.
188///
189/// # Performance
190///
191/// Batches rows into groups of 1000 to avoid PostgreSQL parameter limits.
192///
193/// # Examples
194///
195/// ```no_run
196/// # use database_replicator::jsonb::writer::insert_jsonb_batch;
197/// # use database_replicator::jsonb::validate_table_name;
198/// # use serde_json::json;
199/// # async fn example(client: &tokio_postgres::Client) -> anyhow::Result<()> {
200/// let table_name = "users";
201/// validate_table_name(table_name)?;
202/// let rows = vec![
203///     ("1".to_string(), json!({"name": "Alice", "age": 30})),
204///     ("2".to_string(), json!({"name": "Bob", "age": 25})),
205/// ];
206/// insert_jsonb_batch(client, table_name, rows, "sqlite").await?;
207/// # Ok(())
208/// # }
209/// ```
210pub async fn insert_jsonb_batch(
211    client: &Client,
212    table_name: &str,
213    rows: Vec<(String, serde_json::Value)>,
214    source_type: &str,
215) -> Result<()> {
216    // Validate table name to prevent SQL injection
217    crate::jsonb::validate_table_name(table_name)
218        .context("Invalid table name for JSONB batch insert")?;
219
220    if rows.is_empty() {
221        return Ok(());
222    }
223
224    tracing::info!(
225        "Inserting {} rows into JSONB table '{}'",
226        rows.len(),
227        table_name
228    );
229
230    // Batch inserts to avoid parameter limit (PostgreSQL limit is ~65535 parameters)
231    // With 3 parameters per row (id, data, source_type), we can do ~21000 rows
232    // Use conservative 1000 rows per batch
233    const BATCH_SIZE: usize = 1000;
234
235    for (batch_num, chunk) in rows.chunks(BATCH_SIZE).enumerate() {
236        // Build parameterized multi-value INSERT
237        // Format: INSERT INTO table (cols) VALUES ($1,$2,$3),($4,$5,$6),...
238        let mut value_placeholders = Vec::with_capacity(chunk.len());
239        let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
240            Vec::with_capacity(chunk.len() * 3);
241
242        for (idx, (id, data)) in chunk.iter().enumerate() {
243            let param_base = idx * 3 + 1;
244            value_placeholders.push(format!(
245                "(${}, ${}, ${})",
246                param_base,
247                param_base + 1,
248                param_base + 2
249            ));
250
251            // Add parameters in order: id, data, source_type
252            params.push(id);
253            params.push(data);
254            params.push(&source_type);
255        }
256
257        let insert_sql = format!(
258            r#"INSERT INTO "{}" (id, data, _source_type) VALUES {}"#,
259            table_name,
260            value_placeholders.join(", ")
261        );
262
263        client
264            .execute(&insert_sql, &params)
265            .await
266            .with_context(|| {
267                format!(
268                    "Failed to insert batch {} ({} rows) into '{}'",
269                    batch_num,
270                    chunk.len(),
271                    table_name
272                )
273            })?;
274
275        tracing::debug!(
276            "Inserted batch {} ({} rows) into '{}'",
277            batch_num,
278            chunk.len(),
279            table_name
280        );
281    }
282
283    tracing::info!(
284        "Successfully inserted {} rows into '{}'",
285        rows.len(),
286        table_name
287    );
288
289    Ok(())
290}
291
292#[cfg(test)]
293mod tests {
294    #[test]
295    fn test_batch_insert_empty() {
296        // Empty batch should not error
297        // (actual async test requires test database)
298    }
299
300    #[test]
301    fn test_batch_size_calculation() {
302        // Verify our batch size doesn't exceed parameter limits
303        // PostgreSQL parameter limit is 65535
304        // With 3 params per row (id, data, source_type) and 1000 rows per batch:
305        // 1000 * 3 = 3000 parameters per batch, which is well under the limit
306        let batch_size = 1000_usize;
307        let params_per_row = 3_usize;
308        let total_params = batch_size * params_per_row;
309        assert!(
310            total_params < 65535,
311            "Batch size {} * {} params = {} exceeds PostgreSQL limit of 65535",
312            batch_size,
313            params_per_row,
314            total_params
315        );
316    }
317}