pg-upsert 0.1.1

PostgreSQL UPSERT operations using sqlx
Documentation
mod builder;
pub mod error;
mod macros;
pub mod types;

pub use error::{Result, UpsertError};
pub use types::{Field, FieldValue, UpsertOptions};

use builder::build_upsert_sql;
use sqlx::postgres::PgArguments;
use sqlx::{Executor, Postgres};

/// Performs a bulk upsert operation on a PostgreSQL table
/// Returns the number of affected rows
pub async fn upsert<'e, E>(
    executor: E,
    table: &str,
    conflict_fields: &[&str],
    values: Vec<Vec<Field>>,
    options: UpsertOptions,
) -> Result<u64>
where
    E: Executor<'e, Database = Postgres>,
{
    // Validate input values are not empty
    if values.is_empty() {
        return Err(UpsertError::EmptyValues);
    }

    let first_row = &values[0];
    if first_row.is_empty() {
        return Err(UpsertError::EmptyFields);
    }

    // Ensure all rows have the same number of fields with matching names
    let expected_field_count = first_row.len();
    for (idx, row) in values.iter().enumerate() {
        if row.len() != expected_field_count {
            return Err(UpsertError::FieldCountMismatch {
                expected: expected_field_count,
                actual: row.len(),
            });
        }

        if idx > 0 {
            for (i, field) in row.iter().enumerate() {
                if field.name != first_row[i].name {
                    return Err(UpsertError::FieldCountMismatch {
                        expected: expected_field_count,
                        actual: row.len(),
                    });
                }
            }
        }
    }

    // Build the SQL query
    let query = build_upsert_sql(table, first_row, conflict_fields, values.len(), &options)?;

    // Bind field values to query arguments
    let mut args = PgArguments::default();
    for row in &values {
        for field in row {
            field.value.bind_to(&mut args);
        }
    }

    // Execute the query and return affected rows count
    let result = sqlx::query_with(&query.sql, args).execute(executor).await?;

    Ok(result.rows_affected())
}