pg_upsert/
lib.rs

1mod builder;
2pub mod error;
3mod macros;
4pub mod types;
5
6pub use error::{Result, UpsertError};
7pub use types::{Field, FieldValue, UpsertOptions};
8
9use builder::build_upsert_sql;
10use sqlx::postgres::PgArguments;
11use sqlx::{Executor, Postgres};
12
13/// Performs a bulk upsert operation on a PostgreSQL table
14/// Returns the number of affected rows
15pub async fn upsert<'e, E>(
16    executor: E,
17    table: &str,
18    conflict_fields: &[&str],
19    values: Vec<Vec<Field>>,
20    options: UpsertOptions,
21) -> Result<u64>
22where
23    E: Executor<'e, Database = Postgres>,
24{
25    // Validate input values are not empty
26    if values.is_empty() {
27        return Err(UpsertError::EmptyValues);
28    }
29
30    let first_row = &values[0];
31    if first_row.is_empty() {
32        return Err(UpsertError::EmptyFields);
33    }
34
35    // Ensure all rows have the same number of fields with matching names
36    let expected_field_count = first_row.len();
37    for (idx, row) in values.iter().enumerate() {
38        if row.len() != expected_field_count {
39            return Err(UpsertError::FieldCountMismatch {
40                expected: expected_field_count,
41                actual: row.len(),
42            });
43        }
44
45        if idx > 0 {
46            for (i, field) in row.iter().enumerate() {
47                if field.name != first_row[i].name {
48                    return Err(UpsertError::FieldCountMismatch {
49                        expected: expected_field_count,
50                        actual: row.len(),
51                    });
52                }
53            }
54        }
55    }
56
57    // Build the SQL query
58    let query = build_upsert_sql(table, first_row, conflict_fields, values.len(), &options)?;
59
60    // Bind field values to query arguments
61    let mut args = PgArguments::default();
62    for row in &values {
63        for field in row {
64            field.value.bind_to(&mut args);
65        }
66    }
67
68    // Execute the query and return affected rows count
69    let result = sqlx::query_with(&query.sql, args).execute(executor).await?;
70
71    Ok(result.rows_affected())
72}