use crate::config::Connection;
use crate::pools;
use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
use crate::types::{DbMode, Mode, Params, TableOptions};
use crate::TABLE_FIELDS;
use br_pgsql::pools::Pools;
use br_pgsql::PgsqlError;
use chrono::Local;
use json::{array, object, JsonValue};
use log::{error, info, warn};
use std::thread;
#[derive(Clone)]
pub struct Pgsql {
pub connection: Connection,
pub default: String,
pub params: Params,
pub client: Pools,
}
impl Pgsql {
pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
let port = connection
.hostport
.parse::<i32>()
.map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
let cp_connection = connection.clone();
let config = object! {
debug: cp_connection.debug,
username: cp_connection.username,
userpass: cp_connection.userpass,
database: cp_connection.database,
hostname: cp_connection.hostname,
hostport: port,
charset: cp_connection.charset.str(),
pool_max: cp_connection.pool.max_connections,
};
let mut pgsql = br_pgsql::Pgsql::new(&config)?;
let pools = pgsql.pools()?;
Ok(Self {
connection,
default: default.clone(),
params: Params::default("pgsql"),
client: pools,
})
}
fn query(&mut self, sql: &str) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
match result {
Some(Ok(e)) => {
if self.connection.debug {
info!("查询成功: {} {}", thread_id.clone(), sql);
}
(true, e.rows)
}
Some(Err(e)) => {
error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
(false, JsonValue::from(e.to_string()))
}
None => {
error!("事务查询失败: 未找到事务连接 {thread_id}");
(false, JsonValue::from("未找到事务连接"))
}
}
} else {
let mut guard = match self.client.get_guard() {
Ok(g) => g,
Err(e) => {
error!(
"非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
);
return (false, JsonValue::from(e.to_string()));
}
};
match guard.conn().query(sql) {
Ok(e) => {
if self.connection.debug {
info!("查询成功: {} {}", thread_id.clone(), sql);
}
(true, e.rows)
}
Err(ref e) if Self::is_retriable_error(e) => {
guard.discard();
self.client.flush_idle();
warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
thread::sleep(std::time::Duration::from_millis(200));
let mut guard2 = match self.client.get_guard() {
Ok(g) => g,
Err(e) => {
error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
return (false, JsonValue::from(e.to_string()));
}
};
match guard2.conn().query(sql) {
Ok(e) => {
if self.connection.debug {
info!("查询成功(重试): {} {}", thread_id.clone(), sql);
}
(true, e.rows)
}
Err(e) => {
error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
(false, JsonValue::from(e.to_string()))
}
}
}
Err(e) => {
error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
(false, JsonValue::from(e.to_string()))
}
}
}
}
fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
let lock_timeout = self.dynamic_table_lock_timeout(&key);
if self.params.table.is_empty() {
warn!("事务写操作未设置表名,跳过应用层表锁: {thread_id}");
} else if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
&self.params.table,
&key,
lock_timeout,
) {
error!("获取表锁超时: {} {}", self.params.table, thread_id);
return (false, JsonValue::from("table lock timeout"));
}
let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
match result {
Some(Ok(e)) => {
if self.connection.debug {
info!("提交成功: {} {}", thread_id.clone(), sql);
}
if sql.contains("INSERT") {
(true, e.rows)
} else {
(true, e.affect_count.into())
}
}
Some(Err(e)) => {
error!("事务提交失败: {thread_id} {e}");
(false, JsonValue::from(e.to_string()))
}
None => {
error!("事务执行失败: 未找到事务连接 {thread_id}");
(false, JsonValue::from("未找到事务连接"))
}
}
} else {
let mut guard = match self.client.get_guard() {
Ok(g) => g,
Err(e) => {
error!(
"非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
);
return (false, JsonValue::from(e.to_string()));
}
};
match guard.conn().execute(sql) {
Ok(e) => {
if self.connection.debug {
info!("提交成功: {} {}", thread_id.clone(), sql);
}
if sql.contains("INSERT") {
(true, e.rows)
} else {
(true, e.affect_count.into())
}
}
Err(ref e) if Self::is_retriable_error(e) => {
guard.discard();
self.client.flush_idle();
warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
thread::sleep(std::time::Duration::from_millis(200));
let mut guard2 = match self.client.get_guard() {
Ok(g) => g,
Err(e) => {
error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
return (false, JsonValue::from(e.to_string()));
}
};
match guard2.conn().execute(sql) {
Ok(e) => {
if self.connection.debug {
info!("提交成功(重试): {} {}", thread_id.clone(), sql);
}
if sql.contains("INSERT") {
(true, e.rows)
} else {
(true, e.affect_count.into())
}
}
Err(e) => {
error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
(false, JsonValue::from(e.to_string()))
}
}
}
Err(e) => {
error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
(false, JsonValue::from(e.to_string()))
}
}
}
}
fn is_retriable_error(e: &PgsqlError) -> bool {
matches!(
e,
PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
)
}
fn dynamic_table_lock_timeout(&self, key: &str) -> std::time::Duration {
let pool = &self.connection.pool;
let min_secs = pool.connect_timeout_secs.max(1);
let base_secs = pool.write_timeout_secs.max(min_secs);
let max_secs = pool
.read_timeout_secs
.saturating_add(pool.write_timeout_secs)
.max(base_secs.saturating_add(10))
.max(min_secs);
let depth = PGSQL_TRANSACTION_MANAGER.get_depth(key).max(1) as u64;
let (conn_count, lock_count) = PGSQL_TRANSACTION_MANAGER.stats();
let pressure = (conn_count + lock_count) as u64;
let pressure_bonus = (pressure / 2).min(10);
let depth_bonus = depth.saturating_sub(1).min(5);
let timeout_secs = base_secs
.saturating_add(pressure_bonus)
.saturating_add(depth_bonus)
.clamp(min_secs, max_secs);
std::time::Duration::from_secs(timeout_secs)
}
}
impl DbMode for Pgsql {
fn database_tables(&mut self) -> JsonValue {
let sql = "SHOW TABLES".to_string();
match self.sql(sql.as_str()) {
Ok(e) => {
let mut list = vec![];
for item in e.members() {
for (_, value) in item.entries() {
list.push(value.clone());
}
}
list.into()
}
Err(_) => {
array![]
}
}
}
fn database_create(&mut self, name: &str) -> bool {
let sql = format!("CREATE DATABASE {name}");
let (state, data) = self.execute(sql.as_str());
match state {
true => data.as_bool().unwrap_or(true),
false => {
error!("创建数据库失败: {data:?}");
false
}
}
}
fn truncate(&mut self, table: &str) -> bool {
let sql = format!("TRUNCATE TABLE {table}");
let (state, _) = self.execute(sql.as_str());
state
}
}
impl Mode for Pgsql {
fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
let mut sql = String::new();
let mut comments = vec![];
if !options.table_unique.is_empty() {
let full_name = format!(
"{}_unique_{}",
options.table_name,
options.table_unique.join("_")
);
let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
let unique = format!(
"CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
name,
options.table_name,
options.table_unique.join(",")
);
comments.push(unique);
}
for row in options.table_index.iter() {
let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
let name = format!("{}_index_{}", options.table_name, &md5[..16]);
let index = format!(
"CREATE INDEX IF NOT EXISTS {} ON {} ({})",
name,
options.table_name,
row.join(",")
);
comments.push(index);
}
for (name, field) in options.table_fields.entries_mut() {
field["table_name"] = options.table_name.clone().into();
let row = br_fields::field("pgsql", name, field.clone());
let (col_sql, meta) = if let Some(idx) = row.find("--") {
(row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
} else {
(row.trim(), None)
};
if let Some(meta) = meta {
comments.push(format!(
"COMMENT ON COLUMN {}.\"{}\" IS '{}';",
options.table_name, name, meta
));
}
sql = format!("{} {},\r\n", sql, col_sql);
}
let primary_key = format!(
"CONSTRAINT {}_{} PRIMARY KEY ({})",
options.table_name, options.table_key, options.table_key
);
let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
options.table_name,
sql.trim_end_matches(",\r\n"),
primary_key
);
comments.insert(0, sql);
for (_name, field) in options.table_fields.entries() {
let _ = field["mode"].as_str();
}
if self.params.sql {
let info = comments.join("\r\n");
return JsonValue::from(info);
}
for comment in comments {
let (state, _) = self.execute(comment.as_str());
match state {
true => {}
false => {
return JsonValue::from(state);
}
}
}
JsonValue::from(true)
}
fn table_update(&mut self, options: TableOptions) -> JsonValue {
let cache_key = format!("{}{}", self.default, options.table_name);
let table_fields_guard = match TABLE_FIELDS.read() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
if table_fields_guard.get(&cache_key).is_some() {
drop(table_fields_guard);
let mut table_fields_guard = match TABLE_FIELDS.write() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
table_fields_guard.remove(&cache_key);
} else {
drop(table_fields_guard);
}
let fields_list = self.table_info(&options.table_name);
let mut put = vec![];
let mut add = vec![];
let mut del = vec![];
let mut comments = vec![];
for (key, _) in fields_list.entries() {
if options.table_fields[key].is_empty() {
del.push(key);
}
}
for (name, field) in options.table_fields.entries() {
if !fields_list[name].is_empty() {
let old_info = &fields_list[name];
let new_field_sql = br_fields::field("pgsql", name, field.clone());
let old_comment = old_info["comment"].as_str().unwrap_or("");
let old_type = old_info["type"].as_str().unwrap_or("");
let new_comment = if let Some(idx) = new_field_sql.find("--") {
new_field_sql[idx + 2..].trim()
} else {
""
};
let comment_matches =
if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
let old_parts: Vec<&str> = old_comment.split('|').collect();
let new_parts: Vec<&str> = new_comment.split('|').collect();
if old_parts.len() >= 4 && new_parts.len() >= 4 {
old_parts[..4] == new_parts[..4]
} else {
old_comment == new_comment
}
} else if !old_comment.is_empty() && !new_comment.is_empty() {
let old_parts: Vec<&str> = old_comment.split('|').collect();
let new_parts: Vec<&str> = new_comment.split('|').collect();
if old_parts.len() >= 2
&& new_parts.len() >= 2
&& old_parts.len() == new_parts.len()
{
let old_filtered: Vec<&str> = old_parts
.iter()
.filter(|v| **v != "true" && **v != "false")
.copied()
.collect();
let new_filtered: Vec<&str> = new_parts
.iter()
.filter(|v| **v != "true" && **v != "false")
.copied()
.collect();
old_filtered == new_filtered
} else {
old_comment == new_comment
}
} else {
old_comment == new_comment
};
let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
let new_type = if sql_parts.len() > 1 {
sql_parts[1].to_lowercase()
} else {
String::new()
};
let type_matches = match old_type {
"integer" => {
new_type.contains("int")
&& !new_type.contains("bigint")
&& !new_type.contains("smallint")
}
"bigint" => new_type.contains("bigint"),
"smallint" => new_type.contains("smallint"),
"boolean" => new_type.contains("boolean"),
"text" => new_type.contains("text"),
"character varying" => {
if !new_type.contains("varchar") {
false
} else {
let old_len = old_info["max_length"].as_i64().unwrap_or(0);
let new_len = new_type
.trim_start_matches("varchar(")
.trim_end_matches(')')
.parse::<i64>()
.unwrap_or(0);
let matched = old_len == new_len || new_len == 0;
if !matched {
log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
}
old_len == new_len || new_len == 0
}
}
"character" => new_type.contains("char") && !new_type.contains("varchar"),
"numeric" => {
if !(new_type.contains("numeric") || new_type.contains("decimal")) {
false
} else {
let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
let inner = new_type
.replace("numeric(", "")
.replace("decimal(", "")
.replace(')', "");
let parts: Vec<&str> = inner.split(',').collect();
let new_prec = parts
.first()
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(0);
let new_scale = parts
.get(1)
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(0);
old_prec == new_prec && old_scale == new_scale
}
}
"double precision" => {
new_type.contains("double") || new_type.contains("float8")
}
"real" => new_type.contains("real") || new_type.contains("float4"),
"timestamp without time zone" | "timestamp with time zone" => {
new_type.contains("timestamp")
}
"date" => new_type.contains("date") && !new_type.contains("timestamp"),
"time without time zone" | "time with time zone" => {
new_type.contains("time") && !new_type.contains("timestamp")
}
"json" | "jsonb" => new_type.contains("json"),
"uuid" => new_type.contains("uuid"),
"bytea" => new_type.contains("bytea"),
_ => old_type == new_type,
};
if type_matches && comment_matches {
continue;
}
log::debug!(
"字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
options.table_name,
name,
type_matches,
old_type,
new_type,
comment_matches
);
put.push(name);
} else {
add.push(name);
}
}
for name in add.iter() {
let name = name.to_string();
let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
let rows = row.split("--").collect::<Vec<&str>>();
comments.push(format!(
r#"ALTER TABLE "{}" add {};"#,
options.table_name,
rows[0].trim()
));
if rows.len() > 1 {
comments.push(format!(
"COMMENT ON COLUMN {}.\"{}\" IS '{}';",
options.table_name,
name,
rows[1].trim()
));
}
}
for name in del.iter() {
comments.push(format!(
"ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
options.table_name, name
));
}
for name in put.iter() {
let name = name.to_string();
let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
let rows = row.split("--").collect::<Vec<&str>>();
let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
if sql[1].contains("BOOLEAN") {
let text = format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
options.table_name, name
);
comments.push(text.clone());
let text = format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
options.table_name, name, sql[1]
);
comments.push(text.clone());
} else {
let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
let new_type_lower = sql[1].to_lowercase();
let is_date_to_numeric = (old_col_type == "date"
|| old_col_type.contains("timestamp"))
&& (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
if is_date_to_numeric {
comments.push(format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
options.table_name, name
));
comments.push(format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
options.table_name, name, sql[1], name, name, name
));
} else {
let text = format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
options.table_name, name, sql[1]
);
comments.push(text.clone());
}
};
if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
let default_value = rows[0][default_pos + 9..].trim();
if !default_value.is_empty() {
comments.push(format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
options.table_name, name, default_value
));
}
}
let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
.as_str()
.unwrap_or("YES");
let old_is_required = old_is_nullable == "NO";
if old_is_required && name != options.table_key {
comments.push(format!(
"ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
options.table_name, name
));
}
if rows.len() > 1 {
comments.push(format!(
"COMMENT ON COLUMN {}.\"{}\" IS '{}';",
options.table_name,
name,
rows[1].trim()
));
}
}
let mut unique_new = vec![];
let mut index_new = vec![];
let mut primary_key = vec![];
let (_, index_list) = self.query(
format!(
"SELECT * FROM pg_indexes WHERE tablename = '{}'",
options.table_name
)
.as_str(),
);
for item in index_list.members() {
let key_name = item["indexname"].as_str().unwrap_or("");
let indexdef = item["indexdef"].to_string();
if indexdef.contains(
format!(
"CREATE UNIQUE INDEX {}_{} ON",
options.table_name, options.table_key
)
.as_str(),
) {
primary_key.push(key_name.to_string());
continue;
}
if indexdef.contains("CREATE UNIQUE INDEX") {
unique_new.push(key_name.to_string());
continue;
}
if indexdef.contains("CREATE INDEX") {
index_new.push(key_name.to_string());
continue;
}
}
if !options.table_unique.is_empty() {
let full_name = format!(
"{}_unique_{}",
options.table_name,
options.table_unique.join("_")
);
let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
let unique = format!(
"CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
name,
options.table_name,
options.table_unique.join(",")
);
if !unique_new.contains(&name) {
comments.push(unique);
}
unique_new.retain(|x| *x != name);
}
for row in options.table_index.iter() {
let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
let name = format!("{}_index_{}", options.table_name, &md5[..16]);
let index = format!(
"CREATE INDEX IF NOT EXISTS {} ON {} ({})",
name,
options.table_name,
row.join(",")
);
if !index_new.contains(&name) {
comments.push(index);
}
index_new.retain(|x| *x != name);
}
for item in unique_new {
if item.ends_with("_pkey") {
continue;
}
if item.starts_with("unique_") {
comments.push(format!(
"ALTER TABLE {} DROP CONSTRAINT {};\r\n",
options.table_name,
item.clone()
));
} else {
comments.push(format!("DROP INDEX {};\r\n", item.clone()));
}
}
for item in index_new {
if item.ends_with("_pkey") {
continue;
}
comments.push(format!("DROP INDEX {};\r\n", item.clone()));
}
if self.params.sql {
return JsonValue::from(comments.join(""));
}
if comments.is_empty() {
return JsonValue::from(-1);
}
for item in comments.iter() {
let (state, res) = self.execute(item.as_str());
match state {
true => {}
false => {
error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
return JsonValue::from(0);
}
}
}
JsonValue::from(1)
}
fn table_info(&mut self, table: &str) -> JsonValue {
let cache_key = format!("{}{}", self.default, table);
let table_fields_guard = match TABLE_FIELDS.read() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
if let Some(cached) = table_fields_guard.get(&cache_key) {
return cached.clone();
}
drop(table_fields_guard);
let sql = format!(
"SELECT COL.COLUMN_NAME,
COL.DATA_TYPE,
COL.IS_NULLABLE,
COL.CHARACTER_MAXIMUM_LENGTH,
COL.NUMERIC_PRECISION,
COL.NUMERIC_SCALE,
COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
LEFT JOIN
pg_catalog.pg_description DESCRIPTION
ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE COL.TABLE_NAME = '{table}'");
let (state, data) = self.query(sql.as_str());
let mut list = object! {};
if state {
for item in data.members() {
let mut row = object! {};
row["field"] = item["column_name"].clone();
row["comment"] = item["comment"].clone();
row["type"] = item["data_type"].clone();
row["is_nullable"] = item["is_nullable"].clone();
row["max_length"] = item["character_maximum_length"].clone();
row["numeric_precision"] = item["numeric_precision"].clone();
row["numeric_scale"] = item["numeric_scale"].clone();
if let Some(field_name) = row["field"].as_str() {
list[field_name] = row.clone();
}
}
let mut table_fields_guard = match TABLE_FIELDS.write() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
table_fields_guard.insert(cache_key, list.clone());
list
} else {
list
}
}
fn table_is_exist(&mut self, name: &str) -> bool {
let sql = format!("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = '{name}')");
let (state, data) = self.query(sql.as_str());
match state {
true => {
for item in data.members() {
if item.has_key("exists") {
return item["exists"].as_bool().unwrap_or(false);
}
}
false
}
false => false,
}
}
fn table(&mut self, name: &str) -> &mut Pgsql {
self.params = Params::default(self.connection.mode.str().as_str());
let table_name = format!("{}{}", self.connection.prefix, name);
if !super::sql_safety::validate_table_name(&table_name) {
error!("Invalid table name: {}", name);
}
self.params.table = table_name.clone();
self.params.join_table = table_name;
self
}
fn change_table(&mut self, name: &str) -> &mut Self {
self.params.join_table = name.to_string();
self
}
fn autoinc(&mut self) -> &mut Self {
self.params.autoinc = true;
self
}
fn timestamps(&mut self) -> &mut Self {
self.params.timestamps = true;
self
}
fn fetch_sql(&mut self) -> &mut Self {
self.params.sql = true;
self
}
fn order(&mut self, field: &str, by: bool) -> &mut Self {
self.params.order[field] = {
if by {
"DESC"
} else {
"ASC"
}
}
.into();
self
}
fn group(&mut self, field: &str) -> &mut Self {
let fields: Vec<&str> = field.split(",").collect();
for field in fields.iter() {
let field = field.to_string();
self.params.group[field.as_str()] = field.clone().into();
self.params.fields[field.as_str()] = field.clone().into();
}
self
}
fn distinct(&mut self) -> &mut Self {
self.params.distinct = true;
self
}
fn json(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.json[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn location(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.location[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn field(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
for item in list.iter() {
let lower = item.to_lowercase();
let is_expr = lower.contains("count(")
|| lower.contains("sum(")
|| lower.contains("avg(")
|| lower.contains("max(")
|| lower.contains("min(")
|| lower.contains("case ");
if is_expr {
self.params.fields[item.to_string().as_str()] = (*item).into();
} else if item.contains(" as ") {
let text = item.split(" as ").collect::<Vec<&str>>();
self.params.fields[item.to_string().as_str()] =
format!("{}.{} as {}", join_table, text[0], text[1]).into();
} else {
self.params.fields[item.to_string().as_str()] =
format!("{join_table}.{item}").into();
}
}
self
}
fn field_raw(&mut self, expr: &str) -> &mut Self {
self.params.fields[expr] = expr.into();
self
}
fn hidden(&mut self, name: &str) -> &mut Self {
let hidden: Vec<&str> = name.split(",").collect();
let fields_list = self.table_info(self.params.clone().table.as_str());
let mut data = array![];
for item in fields_list.members() {
let _ = data.push(object! {
"name":item["field"].as_str().unwrap_or("")
});
}
for item in data.members() {
let name = item["name"].as_str().unwrap_or("");
if !hidden.contains(&name) {
self.params.fields[name] = name.into();
}
}
self
}
fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
let bool_val = value.as_bool().unwrap_or(false);
self.params
.where_and
.push(format!("{join_table}.{field} {compare} {bool_val}"));
return self;
}
match compare {
"between" => {
self.params.where_and.push(format!(
"{}.{} between '{}' AND '{}'",
join_table, field, value[0], value[1]
));
}
"set" => {
let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
let mut wheredata = vec![];
for item in list.iter() {
wheredata.push(format!(
"'{item}' = ANY (string_to_array({join_table}.{field},','))"
));
}
self.params
.where_and
.push(format!("({})", wheredata.join(" or ")));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.{field} not in ({text})"));
}
"is" => {
self.params
.where_and
.push(format!("{join_table}.{field} is {value}"));
}
"isnot" => {
self.params
.where_and
.push(format!("{join_table}.{field} is not {value}"));
}
"notlike" => {
self.params
.where_and
.push(format!("{join_table}.{field} not like '{value}'"));
}
"in" => {
if value.is_array() && value.is_empty() {
self.params.where_and.push("1=0".to_string());
return self;
}
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else if value.is_null() {
text = format!("{text},null");
} else {
let value = value.as_str().unwrap_or("");
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.{field} {compare} ({text})"));
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_and.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts.push(format!(
"{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
escaped
));
}
self.params
.where_and
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params.where_and.push(format!(
"{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
escaped
));
}
}
_ => {
self.params
.where_and
.push(format!("{join_table}.{field} {compare} '{value}'"));
}
}
self
}
fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
let bool_val = value.as_bool().unwrap_or(false);
self.params
.where_or
.push(format!("{join_table}.{field} {compare} {bool_val}"));
return self;
}
match compare {
"between" => {
self.params.where_or.push(format!(
"{}.{} between '{}' AND '{}'",
join_table, field, value[0], value[1]
));
}
"set" => {
let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
let mut wheredata = vec![];
for item in list.iter() {
wheredata.push(format!(
"'{item}' = ANY (string_to_array({join_table}.{field},','))"
));
}
self.params
.where_or
.push(format!("({})", wheredata.join(" or ")));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.{field} not in ({text})"));
}
"is" => {
self.params
.where_or
.push(format!("{join_table}.{field} is {value}"));
}
"isnot" => {
self.params
.where_or
.push(format!("{join_table}.{field} is not {value}"));
}
"in" => {
if value.is_array() && value.is_empty() {
self.params.where_or.push("1=0".to_string());
return self;
}
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else {
let value = value.as_str().unwrap_or("");
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.{field} {compare} ({text})"));
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_or.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts.push(format!(
"{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
escaped
));
}
self.params
.where_or
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params.where_or.push(format!(
"{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
escaped
));
}
}
_ => {
self.params
.where_or
.push(format!("{join_table}.{field} {compare} '{value}'"));
}
}
self
}
fn where_raw(&mut self, expr: &str) -> &mut Self {
self.params.where_and.push(expr.to_string());
self
}
fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("\"{field}\" IN ({sub_sql})"));
self
}
fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("\"{field}\" NOT IN ({sub_sql})"));
self
}
fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params.where_and.push(format!("EXISTS ({sub_sql})"));
self
}
fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("NOT EXISTS ({sub_sql})"));
self
}
fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
self.params.where_column = format!(
"{}.{} {} {}.{}",
self.params.table, field_a, compare, self.params.table, field_b
);
self
}
fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
self.params
.update_column
.push(format!("{field_a} = {compare}"));
self
}
fn page(&mut self, page: i32, limit: i32) -> &mut Self {
self.params.page = page;
self.params.limit = limit;
self
}
fn limit(&mut self, count: i32) -> &mut Self {
self.params.limit_only = count;
self
}
fn column(&mut self, field: &str) -> JsonValue {
self.field(field);
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql);
}
let (state, data) = self.query(sql.as_str());
match state {
true => {
let mut list = array![];
for item in data.members() {
if self.params.json[field].is_empty() {
let _ = list.push(item[field].clone());
} else {
let data =
json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
let _ = list.push(data);
}
}
list
}
false => {
array![]
}
}
}
fn count(&mut self) -> JsonValue {
self.params.fields = json::object! {};
self.params.fields["count"] = "count(*) as count".to_string().into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
data[0]["count"].clone()
} else {
JsonValue::from(0)
}
}
fn max(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("max({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data.clone();
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn min(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("min({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn sum(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("sum({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
match state {
true => {
if data.len() > 1 {
return data;
}
data[0][field].clone()
}
false => JsonValue::from(0),
}
}
fn avg(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("avg({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn having(&mut self, expr: &str) -> &mut Self {
self.params.having.push(expr.to_string());
self
}
fn select(&mut self) -> JsonValue {
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
for (field, _) in self.params.json.entries() {
for item in data.members_mut() {
if !item[field].is_empty() {
let json = item[field].to_string();
item[field] = match json::parse(&json) {
Ok(e) => e,
Err(_) => JsonValue::from(json),
};
}
}
}
data.clone()
}
false => array![],
}
}
fn find(&mut self) -> JsonValue {
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
if data.is_empty() {
return object! {};
}
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0].clone()
}
false => {
object! {}
}
}
}
fn value(&mut self, field: &str) -> JsonValue {
self.params.fields = object! {};
self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0][field].clone()
}
false => {
if self.connection.debug {
info!("{data:?}");
}
JsonValue::Null
}
}
}
fn insert(&mut self, mut data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("\"{}\"", field));
if value.is_string() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
} else if value.is_array() {
if self.params.json[field].is_empty() {
let array = value
.members()
.map(|x| x.as_str().unwrap_or(""))
.collect::<Vec<&str>>()
.join(",");
values.push(format!("'{}'", array.replace("'", "''")));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_number() {
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
if col_type == "boolean" {
let bool_val = value.as_i64().unwrap_or(0) != 0;
values.push(format!("{bool_val}"));
} else if col_type.contains("int") {
values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
} else {
values.push(format!("{value}"));
}
continue;
} else if value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
}
}
let fields = fields.join(",");
let values = values.join(",");
let sql = format!(
"INSERT INTO {} ({}) VALUES ({});",
self.params.table, fields, values
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, ids) = self.execute(sql.as_str());
match state {
true => match self.params.autoinc {
true => ids.clone(),
false => data["id"].clone(),
},
false => {
let thread_id = format!("{:?}", thread::current().id());
error!("添加失败: {thread_id} {ids:?} {sql}");
JsonValue::from("")
}
}
}
fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = String::new();
if !self.params.autoinc && data[0]["id"].is_empty() {
data[0]["id"] = "".into();
}
for (field, _) in data[0].entries() {
fields = format!("{fields},\"{field}\"");
}
fields = fields.trim_start_matches(",").to_string();
let core_count = num_cpus::get();
let mut p = pools::Pool::new(core_count * 4);
let autoinc = self.params.autoinc;
for list in data.members() {
let mut item = list.clone();
let i = br_fields::str::Code::verification_code(3);
let fields_list_new = fields_list.clone();
p.execute(move |pcindex| {
if !autoinc && item["id"].is_empty() {
let id = format!(
"{:X}{:X}{}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
pcindex,
i
);
item["id"] = id.into();
}
let mut values = "".to_string();
for (field, value) in item.entries() {
if value.is_string() {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
} else if value.is_number() {
let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
if col_type == "boolean" {
let bool_val = value.as_i64().unwrap_or(0) != 0;
values = format!("{values},{bool_val}");
} else if col_type.contains("int") {
values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
} else {
values = format!("{values},{value}");
}
} else if value.is_boolean() {
values = format!("{values},{value}");
continue;
} else {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
}
}
values = format!("({})", values.trim_start_matches(","));
array![item["id"].clone(), values]
});
}
let (ids_list, mut values) = p.insert_all();
values = values.trim_start_matches(",").to_string();
let sql = format!(
"INSERT INTO {} ({}) VALUES {};",
self.params.table, fields, values
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
match state {
true => match autoinc {
true => data,
false => JsonValue::from(ids_list),
},
false => {
error!("insert_all: {data:?}");
array![]
}
}
}
fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("\"{}\"", field));
if value.is_string() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
} else if value.is_array() {
if self.params.json[field].is_empty() {
let array = value
.members()
.map(|x| x.as_str().unwrap_or(""))
.collect::<Vec<&str>>()
.join(",");
values.push(format!("'{}'", array.replace("'", "''")));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_number() {
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
if col_type == "boolean" {
let bool_val = value.as_i64().unwrap_or(0) != 0;
values.push(format!("{bool_val}"));
} else if col_type.contains("int") {
values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
} else {
values.push(format!("{value}"));
}
continue;
} else if value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
}
}
let conflict_cols: Vec<String> = conflict_fields
.iter()
.map(|f| format!("\"{}\"", f))
.collect();
let update_set: Vec<String> = fields
.iter()
.filter(|f| {
let name = f.trim_matches('"');
!conflict_fields.contains(&name) && name != "id"
})
.map(|f| format!("{f}=EXCLUDED.{f}"))
.collect();
let fields_str = fields.join(",");
let values_str = values.join(",");
let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
self.params.table,
fields_str,
values_str,
conflict_cols.join(","),
update_set.join(",")
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, result) = self.execute(sql.as_str());
match state {
true => match self.params.autoinc {
true => result.clone(),
false => data["id"].clone(),
},
false => {
let thread_id = format!("{:?}", thread::current().id());
error!("upsert失败: {thread_id} {result:?} {sql}");
JsonValue::from("")
}
}
}
fn update(&mut self, data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut values = vec![];
for (field, value) in data.entries() {
if value.is_string() {
values.push(format!(
"\"{}\"='{}'",
field,
value.to_string().replace("'", "''")
));
} else if value.is_number() {
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
if col_type == "boolean" {
let bool_val = value.as_i64().unwrap_or(0) != 0;
values.push(format!("\"{field}\"= {bool_val}"));
} else if col_type.contains("int") {
values.push(format!(
"\"{}\"= {}",
field,
value.as_f64().unwrap_or(0.0) as i64
));
} else {
values.push(format!("\"{field}\"= {value}"));
}
} else if value.is_array() {
if self.params.json[field].is_empty() {
let array = value
.members()
.map(|x| x.as_str().unwrap_or(""))
.collect::<Vec<&str>>()
.join(",");
values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("\"{field}\"='{json}'"));
}
continue;
} else if value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!(
"\"{}\"='{}'",
field,
value.to_string().replace("'", "''")
));
} else {
if value.is_empty() {
values.push(format!("\"{field}\"=''"));
continue;
}
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("\"{field}\"='{json}'"));
}
continue;
} else if value.is_boolean() || value.is_null() {
values.push(format!("\"{field}\"= {value}"));
} else {
values.push(format!("\"{field}\"=\"{value}\""));
}
}
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
}
if !self.params.update_column.is_empty() {
values.extend(self.params.update_column.clone());
}
let values = values.join(",");
let sql = format!(
"UPDATE {} SET {} {};",
self.params.table.clone(),
values,
self.params.where_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
if state {
data
} else {
let thread_id = format!("{:?}", thread::current().id());
error!("update: {thread_id} {data:?} {sql}");
0.into()
}
}
fn update_all(&mut self, data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut values = vec![];
let mut ids = vec![];
for (field, _) in data[0].entries() {
if field == "id" {
continue;
}
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
let mut fields = vec![];
for row in data.members() {
let value = row[field].clone();
let id = row["id"].clone();
ids.push(id.clone());
if value.is_string() {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
} else if value.is_array() || value.is_object() {
if self.params.json[field].is_empty() {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
fields.push(format!("WHEN '{id}' THEN '{json}'"));
}
continue;
} else if value.is_number() {
if col_type == "boolean" {
let bool_val = value.as_i64().unwrap_or(0) != 0;
fields.push(format!("WHEN '{id}' THEN {bool_val}"));
} else {
fields.push(format!("WHEN '{id}' THEN {value}"));
}
} else if value.is_boolean() || value.is_null() {
fields.push(format!("WHEN '{id}' THEN {value}"));
} else {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
}
}
values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
}
self.where_and("id", "in", ids.into());
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("{} = {}", field, value.to_string().clone()));
}
let values = values.join(",");
let sql = format!(
"UPDATE {} SET {} {} {};",
self.params.table.clone(),
values,
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
if state {
data
} else {
error!("update_all: {data:?}");
JsonValue::from(0)
}
}
fn delete(&mut self) -> JsonValue {
let sql = format!(
"delete FROM {} {} {};",
self.params.table.clone(),
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
match state {
true => data,
false => {
error!("delete 失败>>> {data:?}");
JsonValue::from(0)
}
}
}
fn transaction(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
let sp = format!("SAVEPOINT sp_{}", depth + 1);
let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
return true;
}
let mut conn = None;
for attempt in 0..3u8 {
match self.client.get_connect_for_transaction() {
Ok(mut c) => {
if c.is_valid() {
conn = Some(c);
break;
}
warn!("事务连接无效(第{}次)", attempt + 1);
self.client.release_transaction_conn();
}
Err(e) => {
warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
}
}
if attempt < 2 {
thread::sleep(std::time::Duration::from_millis(200));
}
}
let mut conn = match conn {
Some(c) => c,
None => {
error!("获取事务连接重试耗尽");
return false;
}
};
if let Err(e) = conn.execute("START TRANSACTION") {
error!("启动事务失败: {e}");
self.client.release_transaction_conn();
return false;
}
PGSQL_TRANSACTION_MANAGER.start(&key, conn);
true
}
fn commit(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
error!("commit: 没有活跃的事务");
return false;
}
let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
if depth > 1 {
let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
return true;
}
let commit_result =
PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
let success = match commit_result {
Some(Ok(_)) => true,
Some(Err(e)) => {
error!("提交事务失败: {e}");
false
}
None => {
error!("提交事务失败: 未找到连接");
false
}
};
if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
self.client.release_transaction_conn_with_conn(conn);
} else {
self.client.release_transaction_conn();
}
success
}
fn rollback(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
error!("rollback: 没有活跃的事务");
return false;
}
let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
if depth > 1 {
let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &key);
return true;
}
let rollback_result =
PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
let success = match rollback_result {
Some(Ok(_)) => true,
Some(Err(e)) => {
error!("回滚失败: {e}");
false
}
None => {
error!("回滚失败: 未找到连接");
false
}
};
if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &key) {
self.client.release_transaction_conn_with_conn(conn);
} else {
self.client.release_transaction_conn();
}
success
}
fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
let (state, data) = self.query(sql);
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
self.params = Params::default(self.connection.mode.str().as_str());
let (state, data) = self.execute(sql);
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn inc(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("{field} + {num}").into();
self
}
fn dec(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("{field} - {num}").into();
self
}
fn buildsql(&mut self) -> String {
self.fetch_sql();
let sql = self.select().to_string();
format!("( {} ) {}", sql, self.params.table)
}
fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
for field in fields {
self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
}
self
}
fn join(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
let main_fields = if main_fields.is_empty() {
"id"
} else {
main_fields
};
let second_fields = if second_fields.is_empty() {
self.params.table.clone()
} else {
second_fields.to_string().clone()
};
let sec_table_name = format!("{}{}", table, "_2");
let second_table = format!("{} {}", table, sec_table_name.clone());
self.params.join_table = sec_table_name.clone();
self.params.join.push(format!(
" INNER JOIN {} ON {}.{} = {}.{}",
second_table, self.params.table, main_fields, sec_table_name, second_fields
));
self
}
fn join_right(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_full(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn union(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION {sub_sql}"));
self
}
fn union_all(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION ALL {sub_sql}"));
self
}
fn lock_for_update(&mut self) -> &mut Self {
self.params.lock_mode = "FOR UPDATE".to_string();
self
}
fn lock_for_share(&mut self) -> &mut Self {
self.params.lock_mode = "FOR SHARE".to_string();
self
}
}