use crate::drivers::postgresql::pool_manager::{ConnectionPoolManager, ConnectionPoolSnapshot};
use crate::parser::query_builder::{
Condition, build_insert_placeholders_for_entries, build_where_clause, format_condition_clause,
sanitize_identifier, sanitize_qualified_table_identifier,
};
use crate::utils::postgres_types::timestamptz_cast_for_column;
use anyhow::{Context, Result, anyhow};
use futures::future::join_all;
use serde_json::{Map, Value};
use uuid::Uuid;
use sqlx::Error as SqlxError;
use sqlx::Row;
use sqlx::postgres::{PgArguments, PgPool, PgRow};
use sqlx::query::Query;
use sqlx::types::Json;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::RwLock;
use tracing::{error, info};
fn build_update_set_parts(entries: &[(String, Value)]) -> Vec<String> {
entries
.iter()
.enumerate()
.map(|(idx, (column, value))| {
if timestamptz_cast_for_column(column, value) {
format!("{} = ${}::timestamptz", column, idx + 1)
} else {
format!("{} = ${}", column, idx + 1)
}
})
.collect::<Vec<_>>()
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RegisteredClient {
pub client_name: String,
pub source: String,
pub description: Option<String>,
pub pg_uri: Option<String>,
pub pg_uri_env_var: Option<String>,
pub config_uri_template: Option<String>,
pub is_active: bool,
pub is_frozen: bool,
pub pool_connected: bool,
}
#[derive(Debug, Clone)]
pub struct ClientConnectionTarget {
pub client_name: String,
pub source: String,
pub description: Option<String>,
pub pg_uri: Option<String>,
pub pg_uri_env_var: Option<String>,
pub config_uri_template: Option<String>,
pub is_active: bool,
pub is_frozen: bool,
}
pub struct PostgresClientRegistry {
pools: RwLock<HashMap<String, PgPool>>,
clients: RwLock<HashMap<String, RegisteredClient>>,
pool_manager: ConnectionPoolManager,
}
impl PostgresClientRegistry {
pub fn empty() -> Self {
Self {
pools: RwLock::new(HashMap::new()),
clients: RwLock::new(HashMap::new()),
pool_manager: ConnectionPoolManager::default(),
}
}
pub fn with_pool_manager(pool_manager: ConnectionPoolManager) -> Self {
Self {
pools: RwLock::new(HashMap::new()),
clients: RwLock::new(HashMap::new()),
pool_manager,
}
}
pub fn is_empty(&self) -> bool {
self.list_clients().is_empty()
}
pub async fn from_entries(
entries: Vec<(String, String)>,
pool_manager: ConnectionPoolManager,
) -> Result<(Self, Vec<(String, anyhow::Error)>)> {
let connect_tasks = entries.into_iter().map(|(client_name, uri)| {
let pool_manager: ConnectionPoolManager = pool_manager.clone();
async move {
tracing::info!(client = %client_name, uri = %redact_uri_for_log(&uri), "connecting to Postgres client");
match pool_manager.connect(&uri).await {
Ok(pool) => {
tracing::info!(client = %client_name, "connected to Postgres client");
Ok((client_name, pool))
}
Err(err) => {
let context_error = anyhow!(
"failed to connect to postgres client {}: {}",
client_name,
err
);
tracing::warn!(
client = %client_name,
uri = %redact_uri_for_log(&uri),
error = %err,
"failed to connect to Postgres client"
);
Err((client_name, context_error))
}
}
}
});
let mut pools: HashMap<String, PgPool> = HashMap::new();
let mut clients: HashMap<String, RegisteredClient> = HashMap::new();
let mut errors: Vec<(String, anyhow::Error)> = Vec::new();
for result in join_all(connect_tasks).await {
match result {
Ok((client_name, pool)) => {
let normalized: String = normalize_client_key(&client_name);
clients.insert(
normalized.clone(),
RegisteredClient {
client_name: client_name.clone(),
source: "config".to_string(),
description: None,
pg_uri: None,
pg_uri_env_var: None,
config_uri_template: None,
is_active: true,
is_frozen: false,
pool_connected: true,
},
);
pools.insert(normalized, pool);
}
Err((client_name, err)) => {
errors.push((client_name, err));
}
}
}
Ok((
Self {
pools: RwLock::new(pools),
clients: RwLock::new(clients),
pool_manager,
},
errors,
))
}
pub fn get_pool(&self, key: &str) -> Option<PgPool> {
self.pools
.read()
.ok()
.and_then(|pools| pools.get(&normalize_client_key(key)).cloned())
}
pub fn list_clients(&self) -> Vec<String> {
let Ok(clients) = self.clients.read() else {
return Vec::new();
};
let mut names: Vec<String> = clients
.values()
.filter(|client| client.is_active && !client.is_frozen && client.pool_connected)
.map(|client| client.client_name.clone())
.collect();
names.sort_by_cached_key(|value| value.to_lowercase());
names
}
pub fn list_registered_clients(&self) -> Vec<RegisteredClient> {
let Ok(clients) = self.clients.read() else {
return Vec::new();
};
let mut values: Vec<RegisteredClient> = clients.values().cloned().collect();
values.sort_by_cached_key(|value| value.client_name.to_lowercase());
values
}
pub fn registered_client(&self, key: &str) -> Option<RegisteredClient> {
self.clients
.read()
.ok()
.and_then(|clients| clients.get(&normalize_client_key(key)).cloned())
}
pub fn remember_client(&self, target: ClientConnectionTarget, pool_connected: bool) {
let normalized: String = normalize_client_key(&target.client_name);
if let Ok(mut clients) = self.clients.write() {
clients.insert(
normalized,
RegisteredClient {
client_name: target.client_name,
source: target.source,
description: target.description,
pg_uri: target.pg_uri,
pg_uri_env_var: target.pg_uri_env_var,
config_uri_template: target.config_uri_template,
is_active: target.is_active,
is_frozen: target.is_frozen,
pool_connected,
},
);
}
}
pub async fn upsert_client(&self, target: ClientConnectionTarget) -> Result<(), anyhow::Error> {
let normalized = normalize_client_key(&target.client_name);
let mut registered = RegisteredClient {
client_name: target.client_name.clone(),
source: target.source.clone(),
description: target.description.clone(),
pg_uri: target.pg_uri.clone(),
pg_uri_env_var: target.pg_uri_env_var.clone(),
config_uri_template: target.config_uri_template.clone(),
is_active: target.is_active,
is_frozen: target.is_frozen,
pool_connected: false,
};
if !target.is_active || target.is_frozen {
if let Ok(mut pools) = self.pools.write() {
pools.remove(&normalized);
}
if let Ok(mut clients) = self.clients.write() {
clients.insert(normalized, registered);
}
return Ok(());
}
let connection_uri: String = resolve_target_connection_uri(&target).ok_or_else(|| {
anyhow!(
"client '{}' must define either pg_uri, pg_uri_env_var, or config_uri_template",
target.client_name
)
})?;
let pool = self
.pool_manager
.connect(&connection_uri)
.await
.with_context(|| {
format!(
"failed to connect to postgres client {}",
target.client_name
)
})?;
registered.pool_connected = true;
if let Ok(mut pools) = self.pools.write() {
pools.insert(normalized.clone(), pool);
}
if let Ok(mut clients) = self.clients.write() {
clients.insert(normalized, registered);
}
Ok(())
}
pub fn remove_client(&self, key: &str) {
let normalized: String = normalize_client_key(key);
if let Ok(mut pools) = self.pools.write() {
pools.remove(&normalized);
}
if let Ok(mut clients) = self.clients.write() {
clients.remove(&normalized);
}
}
pub fn mark_unavailable(&self, key: &str) {
let normalized = normalize_client_key(key);
if let Ok(mut pools) = self.pools.write() {
pools.remove(&normalized);
}
if let Ok(mut clients) = self.clients.write()
&& let Some(client) = clients.get_mut(&normalized)
{
client.pool_connected = false;
}
}
pub fn sync_connection_status(&self) {
let pool_keys: HashSet<String> = {
let Ok(pools) = self.pools.read() else {
return;
};
pools.keys().cloned().collect()
};
if let Ok(mut clients) = self.clients.write() {
for client in clients.values_mut() {
let normalized = normalize_client_key(&client.client_name);
client.pool_connected = pool_keys.contains(&normalized);
}
}
}
pub fn pool_snapshots(&self) -> Vec<ConnectionPoolSnapshot> {
let Ok(pools) = self.pools.read() else {
return Vec::new();
};
pools
.iter()
.map(|(client, pool)| ConnectionPoolSnapshot::new(client.clone(), pool))
.collect()
}
}
fn normalize_client_key(value: &str) -> String {
value.trim().to_lowercase()
}
pub fn normalize_postgres_client_key(value: &str) -> String {
normalize_client_key(value)
}
fn redact_uri_for_log(uri: &str) -> String {
let prefix = if uri.starts_with("postgresql://") {
"postgresql://"
} else if uri.starts_with("postgres://") {
"postgres://"
} else {
return "[REDACTED]".to_string();
};
let after_scheme = &uri[prefix.len()..];
if let Some(at_pos) = after_scheme.rfind('@') {
let userinfo = &after_scheme[..at_pos];
let rest = &after_scheme[at_pos..];
if let Some(colon_pos) = userinfo.find(':') {
let user = &userinfo[..colon_pos];
return format!("{}{}:***@{}", prefix, user, &rest[1..]);
}
}
uri.to_string()
}
fn resolve_target_connection_uri(target: &ClientConnectionTarget) -> Option<String> {
if let Some(uri) = target
.pg_uri
.as_ref()
.filter(|value| !value.trim().is_empty())
{
return Some(uri.clone());
}
if let Some(env_var) = target
.pg_uri_env_var
.as_ref()
.filter(|value| !value.trim().is_empty())
{
let template = format!("${{{}}}", env_var);
return Some(crate::parser::resolve_postgres_uri(&template));
}
target
.config_uri_template
.as_ref()
.filter(|value| !value.trim().is_empty())
.map(|value| crate::parser::resolve_postgres_uri(value))
}
fn bind_array_or_json<'q>(
query: Query<'q, sqlx::Postgres, PgArguments>,
value: &Value,
) -> Query<'q, sqlx::Postgres, PgArguments> {
let Some(arr) = value.as_array() else {
return query.bind(Json(value.clone()));
};
if arr.is_empty() {
return query.bind(Vec::<String>::new());
}
if let Some(values) = arr
.iter()
.map(|item| item.as_str().and_then(|s| Uuid::parse_str(s).ok()))
.collect::<Option<Vec<Uuid>>>()
{
return query.bind(values);
}
if let Some(values) = arr
.iter()
.map(|item| item.as_str().map(ToString::to_string))
.collect::<Option<Vec<String>>>()
{
return query.bind(values);
}
if let Some(values) = arr
.iter()
.map(|item| item.as_i64())
.collect::<Option<Vec<i64>>>()
{
return query.bind(values);
}
if let Some(values) = arr
.iter()
.map(|item| item.as_bool())
.collect::<Option<Vec<bool>>>()
{
return query.bind(values);
}
if let Some(values) = arr
.iter()
.map(|item| item.as_f64())
.collect::<Option<Vec<f64>>>()
{
return query.bind(values);
}
query.bind(Json(value.clone()))
}
macro_rules! bind_value {
($query:expr, $value:expr) => {
match $value {
Value::Null => $query.bind(None::<String>),
Value::Bool(b) => $query.bind(*b),
Value::Number(num) => {
if let Some(i) = num.as_i64() {
$query.bind(i)
} else if let Some(f) = num.as_f64() {
$query.bind(f)
} else if let Some(u) = num.as_u64() {
if let Ok(i) = i64::try_from(u) {
$query.bind(i)
} else {
$query.bind(num.to_string())
}
} else {
$query.bind(num.to_string())
}
}
Value::String(s) => $query.bind(s.clone()),
Value::Array(_) => bind_array_or_json($query, $value),
Value::Object(_) => $query.bind(Json($value.clone())),
}
};
}
macro_rules! bind_value_set {
($query:expr, $value:expr) => {
match $value {
Value::Null => $query.bind(None::<String>),
Value::Bool(b) => $query.bind(*b),
Value::Number(num) => {
if let Some(i) = num.as_i64() {
$query.bind(i)
} else if let Some(f) = num.as_f64() {
$query.bind(f)
} else if let Some(u) = num.as_u64() {
if let Ok(i) = i64::try_from(u) {
$query.bind(i)
} else {
$query.bind(num.to_string())
}
} else {
$query.bind(num.to_string())
}
}
Value::String(s) => {
if let Ok(u) = Uuid::parse_str(s) {
$query.bind(u)
} else {
$query.bind(s.clone())
}
}
Value::Array(_) => bind_array_or_json($query, $value),
Value::Object(_) => $query.bind(Json($value.clone())),
}
};
}
#[derive(Debug)]
pub enum PostgresInsertError {
InvalidTableName,
InvalidPayload(String),
NoValidColumns,
MissingReturnColumn,
SqlExecution {
message: String,
sql_state: Option<String>,
},
}
pub async fn insert_row(
pool: &PgPool,
table_name: &str,
payload: &Value,
) -> Result<Value, PostgresInsertError> {
let table: String = sanitize_qualified_table_identifier(table_name)
.ok_or(PostgresInsertError::InvalidTableName)?;
let object: &Map<String, Value> = payload.as_object().ok_or_else(|| {
PostgresInsertError::InvalidPayload("insert payload must be an object".to_string())
})?;
let entries: Vec<(String, Value)> = object
.iter()
.filter_map(|(column, value)| {
sanitize_identifier(column).map(|sanitized| (sanitized, value.clone()))
})
.collect::<Vec<_>>();
if entries.is_empty() {
return Err(PostgresInsertError::NoValidColumns);
}
let columns: Vec<&str> = entries
.iter()
.map(|(column, _)| column.as_str())
.collect::<Vec<_>>();
let (placeholders, bind_values) = build_insert_placeholders_for_entries(&entries);
let sql: String = format!(
"INSERT INTO {table} AS t ({columns}) VALUES ({placeholders}) RETURNING to_jsonb(t.*) AS data",
table = table,
columns = columns.join(", "),
placeholders = placeholders.join(", ")
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for value in bind_values {
query = bind_value_set!(query, value);
}
let row: PgRow = query.fetch_one(pool).await.map_err(|err| match err {
SqlxError::Database(db_err) => PostgresInsertError::SqlExecution {
message: db_err.message().to_string(),
sql_state: db_err.code().map(|code| code.to_string()),
},
other => PostgresInsertError::SqlExecution {
message: other.to_string(),
sql_state: None,
},
})?;
let data: Json<Value> = row
.try_get("data")
.map_err(|_| PostgresInsertError::MissingReturnColumn)?;
Ok(data.0)
}
pub async fn insert_rows_bulk(
pool: &PgPool,
table_name: &str,
payloads: &[Value],
) -> Result<Vec<Value>, PostgresInsertError> {
if payloads.is_empty() {
return Err(PostgresInsertError::InvalidPayload(
"insert payload array must not be empty".to_string(),
));
}
let mut column_order: Vec<(String, String)> = Vec::new();
let mut seen_columns: HashSet<String> = HashSet::new();
for payload in payloads {
let obj = payload.as_object().ok_or_else(|| {
PostgresInsertError::InvalidPayload(
"each insert payload must be a JSON object".to_string(),
)
})?;
for column in obj.keys() {
if seen_columns.contains(column) {
continue;
}
if let Some(sanitized) = sanitize_identifier(column) {
seen_columns.insert(column.clone());
column_order.push((column.clone(), sanitized));
}
}
}
if column_order.is_empty() {
return Err(PostgresInsertError::NoValidColumns);
}
let sanitized_columns: Vec<String> = column_order
.iter()
.map(|(_, sanitized)| sanitized.clone())
.collect();
let column_names: Vec<String> = column_order.iter().map(|(raw, _)| raw.clone()).collect();
let mut placeholders: Vec<String> = Vec::new();
let mut bind_values: Vec<Value> = Vec::new();
let mut param_index: i32 = 1;
for payload in payloads {
let row_obj: &Map<String, Value> = payload.as_object().unwrap();
let mut row_placeholders: Vec<String> = Vec::new();
for (raw_col, sanitized_col) in column_names.iter().zip(sanitized_columns.iter()) {
let value = row_obj.get(raw_col).cloned().unwrap_or(Value::Null);
let cell = if value.is_null() {
format!("${}", param_index)
} else if timestamptz_cast_for_column(sanitized_col, &value) {
format!("${}::timestamptz", param_index)
} else {
format!("${}", param_index)
};
bind_values.push(value);
row_placeholders.push(cell);
param_index += 1;
}
placeholders.push(format!("({})", row_placeholders.join(", ")));
}
let table: String = sanitize_qualified_table_identifier(table_name)
.ok_or(PostgresInsertError::InvalidTableName)?;
let sql: String = format!(
"INSERT INTO {table} AS t ({columns}) VALUES {placeholders} RETURNING to_jsonb(t.*) AS data",
table = table,
columns = sanitized_columns.join(", "),
placeholders = placeholders.join(", ")
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for value in &bind_values {
query = bind_value_set!(query, value);
}
let rows: Vec<PgRow> =
query
.fetch_all(pool)
.await
.map_err(|err| PostgresInsertError::SqlExecution {
message: err.to_string(),
sql_state: None,
})?;
let mut result: Vec<Value> = Vec::new();
for row in rows {
let data: Json<Value> = row
.try_get("data")
.map_err(|_| PostgresInsertError::MissingReturnColumn)?;
result.push(data.0);
}
Ok(result)
}
pub async fn upsert_row(
pool: &PgPool,
table_name: &str,
payload: &Value,
conflict_column: &str,
) -> Result<Value, PostgresInsertError> {
let table: String = sanitize_qualified_table_identifier(table_name)
.ok_or(PostgresInsertError::InvalidTableName)?;
let conflict: String =
sanitize_identifier(conflict_column).ok_or(PostgresInsertError::InvalidTableName)?;
let object: &Map<String, Value> = payload.as_object().ok_or_else(|| {
PostgresInsertError::InvalidPayload("upsert payload must be an object".to_string())
})?;
let entries: Vec<(String, Value)> = object
.iter()
.filter_map(|(column, value)| {
sanitize_identifier(column).map(|sanitized| (sanitized, value.clone()))
})
.collect::<Vec<_>>();
if entries.is_empty() {
return Err(PostgresInsertError::NoValidColumns);
}
let columns: Vec<&str> = entries.iter().map(|(column, _)| column.as_str()).collect();
let (placeholders, bind_values) = build_insert_placeholders_for_entries(&entries);
let set_clause: Vec<String> = entries
.iter()
.map(|(column, _)| format!("{} = EXCLUDED.{}", column, column.trim_matches('"')))
.collect::<Vec<_>>();
let sql: String = format!(
"INSERT INTO {table} AS t ({columns}) VALUES ({placeholders}) ON CONFLICT ({conflict}) DO UPDATE SET {set_clause} RETURNING to_jsonb(t.*) AS data",
table = table,
columns = columns.join(", "),
placeholders = placeholders.join(", "),
conflict = conflict,
set_clause = set_clause.join(", ")
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for value in bind_values {
query = bind_value_set!(query, value);
}
let row: PgRow =
query
.fetch_one(pool)
.await
.map_err(|err| PostgresInsertError::SqlExecution {
message: err.to_string(),
sql_state: None,
})?;
let data: Json<Value> = row
.try_get("data")
.map_err(|_| PostgresInsertError::MissingReturnColumn)?;
Ok(data.0)
}
pub async fn update_row(
pool: &PgPool,
table_name: &str,
conditions: &[Condition],
payload: &Value,
) -> Result<Value> {
let table: String =
sanitize_identifier(table_name).ok_or_else(|| anyhow!("invalid table name"))?;
let entries: Vec<(String, Value)> = payload
.as_object()
.context("update payload must be an object")?
.iter()
.filter_map(|(column, value)| {
sanitize_identifier(column).map(|sanitized| (sanitized, value.clone()))
})
.collect::<Vec<_>>();
if entries.is_empty() {
return Err(anyhow!("no valid columns provided for update"));
}
let set_parts: Vec<String> = build_update_set_parts(&entries);
let (where_clause, where_values) = build_where_clause(conditions, entries.len() + 1)?;
if where_clause.is_empty() {
return Err(anyhow!("at least one valid condition is required"));
}
let sql: String = format!(
"UPDATE {table} AS t SET {set_clause}{where_clause} RETURNING to_jsonb(t.*) AS data",
table = table,
set_clause = set_parts.join(", "),
where_clause = where_clause
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for (_, value) in &entries {
query = bind_value_set!(query, value);
}
for value in &where_values {
query = bind_value!(query, value);
}
let row: PgRow = query
.fetch_one(pool)
.await
.context("failed to execute update row")?;
let data: Json<Value> = row
.try_get("data")
.context("missing data column after update")?;
Ok(data.0)
}
pub async fn update_rows(
pool: &PgPool,
table_name: &str,
conditions: &[Condition],
payload: &Value,
) -> Result<Vec<Value>> {
let table: String =
sanitize_identifier(table_name).ok_or_else(|| anyhow!("invalid table name"))?;
let entries: Vec<(String, Value)> = payload
.as_object()
.context("update payload must be an object")?
.iter()
.filter_map(|(column, value)| {
sanitize_identifier(column).map(|sanitized| (sanitized, value.clone()))
})
.collect::<Vec<_>>();
if entries.is_empty() {
return Err(anyhow!("no valid columns provided for update"));
}
let set_parts: Vec<String> = build_update_set_parts(&entries);
let (where_clause, where_values) = build_where_clause(conditions, entries.len() + 1)?;
if where_clause.is_empty() {
return Err(anyhow!("at least one valid condition is required"));
}
let sql: String = format!(
"UPDATE {table} AS t SET {set_clause}{where_clause} RETURNING to_jsonb(t.*) AS data",
table = table,
set_clause = set_parts.join(", "),
where_clause = where_clause
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for (_, value) in &entries {
query = bind_value_set!(query, value);
}
for value in &where_values {
query = bind_value!(query, value);
}
let rows: Vec<PgRow> = query
.fetch_all(pool)
.await
.context("failed to execute update rows")?;
let mut result: Vec<Value> = Vec::new();
for row in rows {
let data: Json<Value> = row
.try_get("data")
.context("missing data column after bulk update")?;
result.push(data.0);
}
Ok(result)
}
fn order_by_clause(sort: Option<(&str, bool)>) -> String {
sort.and_then(|(col, ascending)| {
sanitize_identifier(col)
.map(|c| format!(" ORDER BY {} {}", c, if ascending { "ASC" } else { "DESC" }))
})
.unwrap_or_default()
}
fn build_or_clause(
or_condition_groups: &[Vec<Condition>],
idx: &mut usize,
bindings: &mut Vec<Value>,
) -> Option<String> {
let mut groups: Vec<String> = Vec::new();
for group in or_condition_groups {
let mut parts: Vec<String> = Vec::new();
for condition in group {
if let Some(column) = sanitize_identifier(&condition.column)
&& let Some(expr) = format_condition_clause(&column, condition, idx, bindings)
{
parts.push(expr);
}
}
if !parts.is_empty() {
groups.push(format!("({})", parts.join(" OR ")));
}
}
if groups.is_empty() {
None
} else {
Some(groups.join(" AND "))
}
}
#[allow(clippy::too_many_arguments)]
async fn build_select_column_sql(
pool: &PgPool,
table_name: &str,
columns: &[&str],
where_clause: &str,
order_clause: &str,
limit: i64,
offset: i64,
allow_schema_names_prefixed_as_table_name: bool,
) -> Result<String> {
let table: String = sanitize_qualified_table_identifier(table_name)
.ok_or_else(|| anyhow!("invalid table name"))?;
let use_all_columns: bool = columns.is_empty() || columns.contains(&"*");
if use_all_columns {
return Ok(format!(
"SELECT row_to_json(t.*) AS data FROM {table} AS t{where_clause}{order_clause} LIMIT {limit} OFFSET {offset}",
table = table,
where_clause = where_clause,
order_clause = order_clause,
limit = limit,
offset = offset
));
}
let resolved_columns = crate::drivers::postgresql::column_resolver::resolve_columns(
pool,
table_name,
columns,
allow_schema_names_prefixed_as_table_name,
)
.await?;
let column_pairs: Vec<String> = columns
.iter()
.zip(resolved_columns.iter())
.filter_map(|(requested, resolved)| {
sanitize_identifier(resolved)
.map(|sanitized| format!("'{}', t.{}", requested, sanitized))
})
.collect();
if column_pairs.is_empty() {
return Err(anyhow!("no valid columns specified"));
}
Ok(format!(
"SELECT jsonb_build_object({columns}) AS data FROM {table} AS t{where_clause}{order_clause} LIMIT {limit} OFFSET {offset}",
columns = column_pairs.join(", "),
table = table,
where_clause = where_clause,
order_clause = order_clause,
limit = limit,
offset = offset
))
}
pub async fn fetch_rows(
pool: &PgPool,
table_name: &str,
conditions: &[Condition],
limit: i64,
offset: i64,
order_by: Option<(&str, bool)>,
) -> Result<Vec<Value>> {
let table: String =
sanitize_identifier(table_name).ok_or_else(|| anyhow!("invalid table name"))?;
let (where_clause, where_values) = build_where_clause(conditions, 1)?;
let order_clause: String = order_by_clause(order_by);
let sql: String = format!(
"SELECT row_to_json(t.*) AS data FROM {table} AS t{where_clause}{order_clause} LIMIT {limit} OFFSET {offset}",
table = table,
where_clause = where_clause,
order_clause = order_clause,
limit = limit,
offset = offset
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for value in &where_values {
query = bind_value!(query, value);
}
let rows: Vec<PgRow> = query
.fetch_all(pool)
.await
.context("failed to execute select query")?;
let mut result = Vec::new();
for row in rows {
let data: Json<Value> = row
.try_get("data")
.context("missing data column in select result")?;
result.push(data.0);
}
Ok(result)
}
pub async fn delete_rows(
pool: &PgPool,
table_name: &str,
conditions: &[Condition],
) -> Result<Vec<Value>> {
let table: String = sanitize_qualified_table_identifier(table_name)
.ok_or_else(|| anyhow!("invalid table name"))?;
let (where_clause, where_values) = build_where_clause(conditions, 1)?;
if where_clause.is_empty() {
return Err(anyhow!("at least one valid condition is required"));
}
let sql: String = format!(
"DELETE FROM {table} AS t{where_clause} RETURNING to_jsonb(t.*) AS data",
table = table,
where_clause = where_clause
);
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
for value in &where_values {
query = bind_value!(query, value);
}
let rows: Vec<PgRow> = query
.fetch_all(pool)
.await
.context("failed to execute delete rows")?;
let mut result: Vec<Value> = Vec::new();
for row in rows {
let data: Json<Value> = row
.try_get("data")
.context("missing data column after delete")?;
result.push(data.0);
}
Ok(result)
}
pub async fn fetch_rows_with_columns(
pool: &PgPool,
table_name: &str,
columns: &[&str],
conditions: &[Condition],
limit: i64,
offset: i64,
order_by: Option<(&str, bool)>,
allow_schema_names_prefixed_as_table_name: bool,
) -> Result<Vec<Value>> {
fetch_rows_with_columns_with_or_groups(
pool,
table_name,
columns,
conditions,
&[],
limit,
offset,
order_by,
allow_schema_names_prefixed_as_table_name,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn fetch_rows_with_columns_with_or_groups(
pool: &PgPool,
table_name: &str,
columns: &[&str],
and_conditions: &[Condition],
or_condition_groups: &[Vec<Condition>],
limit: i64,
offset: i64,
order_by: Option<(&str, bool)>,
allow_schema_names_prefixed_as_table_name: bool,
) -> Result<Vec<Value>> {
let (mut where_clause, mut where_values) = build_where_clause(and_conditions, 1)?;
let mut next_idx = where_values.len() + 1;
if let Some(or_clause) = build_or_clause(or_condition_groups, &mut next_idx, &mut where_values)
{
if where_clause.is_empty() {
where_clause = format!(" WHERE {}", or_clause);
} else {
where_clause.push_str(&format!(" AND {}", or_clause));
}
}
let order_clause: String = order_by_clause(order_by);
let sql = build_select_column_sql(
pool,
table_name,
columns,
&where_clause,
&order_clause,
limit,
offset,
allow_schema_names_prefixed_as_table_name,
)
.await?;
let mut query: Query<'_, sqlx::Postgres, PgArguments> = sqlx::query(&sql);
let binding_descriptions: Vec<String> = describe_bind_values(&where_values);
info!(
sql = %sql,
bindings = ?binding_descriptions,
"executing select query"
);
for value in &where_values {
query = bind_value!(query, value);
}
let rows: Vec<PgRow> = query
.fetch_all(pool)
.await
.map_err(|err| {
error!(
sql = %sql,
bindings = ?binding_descriptions,
error = ?err,
"failed to execute select query"
);
err
})
.context("failed to execute select query")?;
let mut result: Vec<Value> = Vec::new();
for row in rows {
let data: Json<Value> = row
.try_get("data")
.context("missing data column in select result")?;
result.push(data.0);
}
Ok(result)
}
#[doc(hidden)]
pub fn describe_bind_values(values: &[Value]) -> Vec<String> {
values.iter().map(describe_bind_value).collect()
}
#[doc(hidden)]
pub fn describe_bind_value(value: &Value) -> String {
match value {
Value::Null => "null (null)".to_string(),
Value::Bool(b) => format!("{} (bool)", b),
Value::Number(num) => {
if let Some(i) = num.as_i64() {
format!("{} (i64)", i)
} else if let Some(u) = num.as_u64() {
format!("{} (u64)", u)
} else if let Some(f) = num.as_f64() {
format!("{} (f64)", f)
} else {
format!("{} (number)", num)
}
}
Value::String(text) => format!("{} (string)", text),
Value::Array(arr) => format!("array(len={})", arr.len()),
Value::Object(map) => format!("object(len={})", map.len()),
}
}
#[cfg(test)]
mod normalize_and_registry_tests {
use super::*;
use crate::drivers::postgresql::pool_manager::ConnectionPoolManager;
use serde_json::json;
#[test]
fn build_or_clause_combines_groups_with_stable_bind_order() {
let groups = vec![
vec![
Condition::new(
"id",
crate::parser::query_builder::ConditionOperator::Eq,
vec![json!(1)],
false,
),
Condition::new(
"name",
crate::parser::query_builder::ConditionOperator::Eq,
vec![json!("alice")],
false,
),
],
vec![Condition::new(
"age",
crate::parser::query_builder::ConditionOperator::Gt,
vec![json!(21)],
false,
)],
];
let mut idx = 1;
let mut values = Vec::new();
let clause = build_or_clause(&groups, &mut idx, &mut values).expect("or clause");
assert_eq!(
clause,
"(t.\"id\" = $1 OR t.\"name\" = $2) AND (t.\"age\" > $3)"
);
assert_eq!(values, vec![json!(1), json!("alice"), json!(21)]);
assert_eq!(idx, 4);
}
#[test]
fn normalize_postgres_client_key_trims_and_lowercases() {
assert_eq!(normalize_postgres_client_key(" Foo_BAR "), "foo_bar");
}
#[tokio::test]
async fn from_entries_empty_yields_no_pools_and_no_errors() {
let pm = ConnectionPoolManager::default();
let (reg, errs) = PostgresClientRegistry::from_entries(vec![], pm)
.await
.expect("ok");
assert!(errs.is_empty());
assert!(reg.list_clients().is_empty());
}
#[tokio::test]
async fn from_entries_records_unavailable_connection() {
let pm = ConnectionPoolManager::default();
let (_reg, errs) = PostgresClientRegistry::from_entries(
vec![(
"bad_port".to_string(),
"postgres://127.0.0.1:1/nodb".to_string(),
)],
pm,
)
.await
.expect("ok");
assert_eq!(errs.len(), 1);
assert!(errs[0].0.contains("bad_port"));
}
#[tokio::test]
async fn from_entries_collects_all_failures_when_no_host_reachable() {
let pm = ConnectionPoolManager::default();
let (reg, errs) = PostgresClientRegistry::from_entries(
vec![
(
"fails".to_string(),
"postgres://127.0.0.1:1/nodb".to_string(),
),
(
"also_fails".to_string(),
"postgres://127.0.0.1:1/other".to_string(),
),
],
pm,
)
.await
.expect("ok");
assert_eq!(errs.len(), 2);
assert!(reg.list_clients().is_empty());
}
}