use crate::pools;
use crate::types::sqlite_transaction::SQLITE_TRANSACTION_MANAGER;
use crate::types::{DbMode, Mode, Params, TableOptions};
use crate::Connection;
use chrono::Local;
use json::{array, object, JsonValue};
use lazy_static::lazy_static;
use log::{error, info};
use sqlite::{Connection as Connect, ConnectionThreadSafe, OpenFlags, State, Statement, Type};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
lazy_static! {
static ref DBS: Mutex<HashMap<String, Arc<ConnectionThreadSafe>>> = Mutex::new(HashMap::new());
static ref SQL_LIST: Mutex<Vec<String>> = Mutex::new(Vec::new());
static ref FIELDS: Mutex<HashMap<String, JsonValue>> = Mutex::new(HashMap::new());
}
#[derive(Clone, Debug)]
pub struct Sqlite {
pub connection: Connection,
pub default: String,
pub params: Params,
}
impl Sqlite {
pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
let dsn = connection.clone().get_dsn();
let db_path = Path::new(&dsn);
if let Some(parent) = db_path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
if let Err(e) = std::fs::create_dir_all(parent) {
error!("sqlite 创建目录失败: {} {:?}", e, parent);
return Err(e.to_string());
}
info!("sqlite 自动创建目录: {:?}", parent);
}
}
let flags = OpenFlags::new().with_create().with_read_write();
match Connect::open_thread_safe_with_flags(dsn.as_str(), flags) {
Ok(e) => {
if let Ok(mut guard) = DBS.lock() {
guard.insert(default.clone(), Arc::new(e));
} else {
error!("sqlite 获取数据库锁失败");
return Err("获取数据库锁失败".to_string());
}
Ok(Self {
connection: connection.clone(),
default: default.clone(),
params: Params::default("sqlite"),
})
}
Err(e) => {
error!("sqlite 启动失败: {} {}", e, dsn.as_str());
Err(e.to_string())
}
}
}
fn query_handle_static(
mut statement: Statement,
sql: &str,
table: &str,
thread_id: &str,
) -> (bool, JsonValue) {
let mut data = array![];
while let State::Row = match statement.next() {
Ok(e) => e,
Err(e) => {
let in_transaction = SQLITE_TRANSACTION_MANAGER.is_in_transaction(thread_id);
if in_transaction {
error!("{} 查询事务: {} {}", thread_id, e, sql);
} else {
error!("{} 非事务查询: {} {}", thread_id, e, sql);
}
return (false, data);
}
} {
let mut list = object! {};
let mut index = 0;
for field in statement.column_names().iter() {
if !list[field.as_str()].is_null() {
index += 1;
continue;
}
match statement.column_type(field.as_str()) {
Ok(types) => match types {
Type::String => {
if let Ok(data) = statement.read::<String, _>(index) {
match data.as_str() {
"false" => {
list[field.as_str()] = JsonValue::from(false);
}
"true" => {
list[field.as_str()] = JsonValue::from(true);
}
_ => {
list[field.as_str()] = JsonValue::from(data.clone());
}
}
}
}
Type::Integer => {
let fields_cache =
FIELDS.lock().ok().and_then(|g| g.get(table).cloned());
if let Some(fields) = fields_cache {
if fields[field.clone()].is_empty() {
if let Ok(data) = statement.read::<i64, _>(index) {
list[field.as_str()] = data.into();
}
index += 1;
continue;
}
let field_type =
fields[field.clone()]["type"].as_str().unwrap_or("");
match field_type {
"INTEGER" => {
if let Ok(data) = statement.read::<i64, _>(index) {
list[field.as_str()] = JsonValue::from(data == 1);
}
}
x if x.contains("int(") => {
if let Ok(data) = statement.read::<i64, _>(index) {
list[field.as_str()] = data.into();
}
}
x if x.contains("decimal(") && x.ends_with(",0)") => {
if let Ok(data) = statement.read::<f64, _>(index) {
list[field.as_str()] = data.into();
}
}
_ => {
if let Ok(data) = statement.read::<i64, _>(index) {
list[field.as_str()] = data.into();
}
}
}
} else if let Ok(data) = statement.read::<i64, _>(index) {
list[field.as_str()] = JsonValue::from(data);
}
}
Type::Float => {
if let Ok(data) = statement.read::<f64, _>(index) {
list[field.as_str()] = JsonValue::from(data);
}
}
Type::Binary => {
if let Ok(data) = statement.read::<String, _>(index) {
list[field.as_str()] = JsonValue::from(data.clone());
}
}
Type::Null => match statement.read::<String, _>(index) {
Ok(data) => {
list[field.as_str()] = JsonValue::from(data.clone());
}
Err(_) => match statement.read::<f64, _>(index) {
Ok(data) => {
if data == 0.0 {
list[field.as_str()] = JsonValue::from("");
} else {
list[field.as_str()] = JsonValue::from(data);
}
}
Err(_) => match statement.read::<i64, _>(index) {
Ok(data) => {
if data == 0 {
list[field.as_str()] = JsonValue::from("");
} else {
list[field.as_str()] = JsonValue::from(data);
}
}
Err(e) => {
error!("Type:{} {:?}", field.as_str(), e);
}
},
},
},
},
Err(e) => {
error!("query Err: {e:?}");
}
}
index += 1;
}
let _ = data.push(list);
}
(true, data)
}
pub fn query(&mut self, sql: String) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
let table = self.params.table.clone();
if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
if self.connection.debug {
info!("{} 查询事务: sql: {:?}", thread_id, sql.clone());
}
let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
match db.prepare(sql.clone()) {
Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
Err(e) => {
error!("{} 查询事务: Err: {} {}", thread_id, e, sql.clone());
(false, e.to_string().into())
}
}
});
match result {
Some(r) => r,
None => {
error!("{thread_id} 未找到事务连接\r\nSQL: {sql}");
(false, JsonValue::from("未找到事务连接"))
}
}
} else {
if self.connection.debug {
info!("{} 非事务查询: sql: {:?}", thread_id, sql.clone());
}
let dbs = match DBS.lock() {
Ok(dbs) => dbs,
Err(e) => {
error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
return (false, JsonValue::from("数据库锁定失败"));
}
};
let db = match dbs.get(&self.default) {
Some(db) => db.clone(),
None => {
error!(
"{thread_id} 未找到默认数据库配置: {}\r\nSQL: {sql}",
self.default
);
return (false, JsonValue::from("未找到默认数据库配置"));
}
};
drop(dbs);
let result = match db.prepare(sql.clone()) {
Ok(statement) => Self::query_handle_static(statement, &sql, &table, &thread_id),
Err(e) => {
error!("{thread_id} 查询非事务: Err: {e}");
(false, e.to_string().into())
}
};
result
}
}
pub fn execute(&mut self, sql: String) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
if self.connection.debug {
info!("{} 执行事务: sql: {}", thread_id, sql.clone());
}
let result = SQLITE_TRANSACTION_MANAGER.with_conn(&thread_id, |db| {
match db.execute(sql.clone()) {
Ok(_) => {
let count = db.change_count();
(true, JsonValue::from(count))
}
Err(e) => (false, JsonValue::from(e.to_string())),
}
});
match result {
Some((true, count)) => {
if self.connection.debug {
info!("{} count:{}", thread_id, count);
}
(true, count)
}
Some((false, err)) => {
error!(
"{} 执行事务: \r\nErr: {}\r\n{}",
thread_id,
err,
sql.clone()
);
(false, err)
}
None => {
error!("{} 未找到事务连接\r\nSQL: {}", thread_id, sql);
(false, JsonValue::from("未找到事务连接"))
}
}
} else {
if self.connection.debug {
info!("{} 执行非事务: \r\nsql: {}", thread_id, sql.clone());
}
let dbs = match DBS.lock() {
Ok(dbs) => dbs,
Err(e) => {
error!("{thread_id} 获取数据库锁失败: {e}\r\nSQL: {sql}");
return (false, JsonValue::from("数据库锁定失败"));
}
};
let db = match dbs.get(&self.default) {
Some(db) => db.clone(),
None => {
error!(
"{} 未找到默认数据库配置: {}\r\nSQL: {}",
thread_id, self.default, sql
);
return (false, JsonValue::from("未找到默认数据库配置"));
}
};
drop(dbs);
match db.execute(sql.clone()) {
Ok(_) => {
let count = db.change_count();
if self.connection.debug {
info!(
"{} count: {} total_count: {}",
thread_id,
count,
db.total_change_count()
);
}
(true, JsonValue::from(count))
}
Err(e) => {
error!(
"{} 执行非事务: Err: {}\r\nSQL: {}",
thread_id,
e,
sql.clone()
);
(false, JsonValue::from(e.to_string()))
}
}
}
}
}
impl DbMode for Sqlite {
fn database_tables(&mut self) -> JsonValue {
let sql = "select name from sqlite_master where type='table' order by name;".to_string();
match self.sql(sql.as_str()) {
Ok(e) => {
let mut list = vec![];
for item in e.members() {
list.push(item["name"].clone());
}
list.into()
}
Err(_) => {
array![]
}
}
}
fn database_create(&mut self, name: &str) -> bool {
let current_dsn = self.connection.clone().get_dsn();
let current_path = Path::new(¤t_dsn);
let new_path = if let Some(parent) = current_path.parent() {
if parent.as_os_str().is_empty() {
Path::new(name).to_path_buf()
} else {
parent.join(name)
}
} else {
Path::new(name).to_path_buf()
};
if let Some(parent) = new_path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
if let Err(e) = std::fs::create_dir_all(parent) {
error!("sqlite database_create 创建目录失败: {} {:?}", e, parent);
return false;
}
}
}
let flags = OpenFlags::new().with_create().with_read_write();
match Connect::open_thread_safe_with_flags(new_path.to_string_lossy().as_ref(), flags) {
Ok(_) => true,
Err(e) => {
error!(
"sqlite database_create 创建数据库失败: {} {:?}",
e, new_path
);
false
}
}
}
fn truncate(&mut self, table: &str) -> bool {
let sql = format!("DELETE FROM `{table}`");
let (state, _) = self.execute(sql);
state
}
}
impl Mode for Sqlite {
fn table_create(&mut self, options: TableOptions) -> JsonValue {
let mut sql = String::new();
let mut unique_fields = String::new();
let mut unique_name = String::new();
let mut unique = String::new();
for item in options.table_unique.iter() {
if unique_fields.is_empty() {
unique_fields = format!("`{item}`");
unique_name = format!("unique_{item}");
} else {
unique_fields = format!("{unique_fields},`{item}`");
unique_name = format!("{unique_name}_{item}");
}
unique = format!(
"CREATE UNIQUE INDEX IF NOT EXISTS {} on {} ({});\r\n",
unique_name, options.table_name, unique_fields
);
}
let mut index = vec![];
for row in options.table_index.iter() {
let mut index_fields = String::new();
let mut index_name = String::new();
for item in row.iter() {
if index_fields.is_empty() {
index_fields = format!("`{item}`");
index_name = format!("index_{item}");
} else {
index_fields = format!("{index_fields},`{item}`");
index_name = format!("{index_name}_{item}");
}
}
index.push(format!(
"CREATE INDEX IF NOT EXISTS {} on {} ({});\r\n",
index_name, options.table_name, index_fields
));
}
for (name, field) in options.table_fields.entries() {
let row = br_fields::field("sqlite", name, field.clone());
sql = format!("{sql} {row},\r\n");
}
sql = sql.trim_end_matches(",\r\n").to_string();
let sql = format!(
"CREATE TABLE IF NOT EXISTS `{}` (\r\n{}\r\n);\r\n",
options.table_name, sql
);
if self.params.sql {
let mut list = vec![sql];
if !unique.is_empty() {
list.push(unique)
}
if !index.is_empty() {
list.extend(index)
}
return JsonValue::from(list.join(""));
}
let thread_id = format!("{:?}", thread::current().id());
let (_, table_exists) = self.query(format!(
"SELECT name FROM sqlite_master WHERE type='table' AND name='{}';",
options.table_name
));
let is_new_table = table_exists.is_empty();
let (state, _) = self.execute(sql.clone());
if state {
if is_new_table {
if !unique.is_empty() {
let (state, _) = self.execute(unique.clone());
info!(
"{} {} 唯一索引创建:{}",
thread_id, options.table_name, state
);
}
for sql in index.iter() {
let (state, _) = self.execute(sql.clone());
info!("{} {} 索引创建:{}", thread_id, options.table_name, state);
}
}
JsonValue::from(true)
} else {
JsonValue::from(false)
}
}
fn table_update(&mut self, options: TableOptions) -> JsonValue {
let thread_id = format!("{:?}", thread::current().id());
let mut sql = String::new();
let mut add = vec![];
let mut del = vec![];
let mut put = vec![];
let (_, mut fields_list) =
self.query(format!("pragma table_info ('{}')", options.table_name));
let mut field_old = object! {};
for item in fields_list.members_mut() {
item["dflt_value"] = item["dflt_value"]
.to_string()
.trim_start_matches("'")
.trim_end_matches("'")
.into();
if let Some(name) = item["name"].as_str() {
field_old[name] = item.clone();
if options.table_fields[name].is_empty() {
del.push(name.to_string());
}
}
}
let mut fields_list = vec![];
let mut fields_list_new = vec![];
for (name, field) in options.table_fields.entries() {
if field_old[name].is_empty() {
add.push(name.to_string());
} else {
fields_list.push(name.to_string());
fields_list_new.push(name.to_string());
let field_mode = field["mode"].as_str().unwrap_or("");
let old_value = match field_mode {
"select" => {
if field_old[name]["dflt_value"].clone().is_empty() {
"".to_string()
} else {
field_old[name]["dflt_value"].clone().to_string()
}
}
"switch" => (field_old[name]["dflt_value"]
.to_string()
.parse::<i32>()
.unwrap_or(0)
== 1)
.to_string(),
_ => field_old[name]["dflt_value"].clone().to_string(),
};
let new_value = match field_mode {
"select" => field["def"]
.members()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(","),
_ => field["def"].clone().to_string(),
};
if old_value != new_value {
info!(
"{} 差异化当前: {} old_value: {} new_value: {} {}",
options.table_name,
name,
old_value,
new_value,
old_value != new_value
);
info!("差异化更新: {} {:#} {:#}", name, field_old[name], field);
put.push(name.to_string());
} else if field_old[name]["pk"].as_i64().unwrap_or(0) == 1
&& name != options.table_key
{
info!("{} 主键替换: {}", options.table_name, name);
put.push(name.to_string());
}
}
}
let mut unique_fields = String::new();
let mut unique_name = String::new();
let mut unique = String::new();
for item in options.table_unique.iter() {
if unique_fields.is_empty() {
unique_fields = format!("`{item}`");
unique_name = format!("unique_{item}");
} else {
unique_fields = format!("{unique_fields},`{item}`");
unique_name = format!("{unique_name}_{item}");
}
unique = format!(
"CREATE UNIQUE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
options.table_name, unique_name, options.table_name, unique_fields
)
}
let mut index = vec![];
for row in options.table_index.iter() {
let mut index_fields = String::new();
let mut index_name = String::new();
for item in row.iter() {
if index_fields.is_empty() {
index_fields = item.to_string();
index_name = format!("index_{item}");
} else {
index_fields = format!("{index_fields},{item}");
index_name = format!("{index_name}_{item}");
}
}
index.push(format!(
"CREATE INDEX IF NOT EXISTS {}_{} on {} ({});\r\n",
options.table_name, index_name, options.table_name, index_fields
));
}
for (name, field) in options.table_fields.entries() {
let row = br_fields::field("sqlite", name, field.clone());
sql = format!("{sql} {row},\r\n");
}
if !unique.is_empty() || !index.is_empty() {
let unique_text = unique.clone();
let (_, unique_old) =
self.query(format!("PRAGMA index_list({});\r\n", options.table_name));
let mut index_old_list = vec![];
let mut index_new_list = vec![];
for item in unique_old.members() {
let origin = item["origin"].as_str().unwrap_or("");
let unique_1 = item["unique"].as_usize().unwrap_or(0);
let name = item["name"].as_str().unwrap_or("");
if origin == "c" && unique_1 == 1 {
if unique.contains(format!(" {name} ").as_str()) {
unique = "".to_string();
}
continue;
}
if origin == "c" && unique_1 == 0 {
index_old_list.push(item);
for item in index.iter() {
if item.contains(format!(" {name} ").as_str()) {
index_new_list.push(item.clone());
}
}
continue;
}
}
if unique.is_empty() {
if index_old_list.len() == index.len()
&& index_old_list.len() == index_new_list.len()
{
index = vec![];
} else {
unique = unique_text;
}
}
}
sql = sql.trim_end_matches(",\r\n").to_string();
sql = format!(
"CREATE TABLE {}_tmp (\r\n{}\r\n);\r\n",
options.table_name, sql
);
let sqls = format!(
"replace INTO {}_tmp (`{}`) select `{}` from {00};\r\n",
options.table_name,
fields_list_new.join("`,`"),
fields_list.join("`,`")
);
let drop_sql = format!("drop table {};\r\n", options.table_name);
let alter_sql = format!("alter table {}_tmp rename to {00};\r\n", options.table_name);
let drop_sql_temp = format!("drop table {}_tmp;\r\n", options.table_name);
if self.params.sql {
let mut list = vec![sql, sqls, drop_sql, alter_sql, drop_sql_temp];
if !unique.is_empty() {
list.push(unique)
}
if !index.is_empty() {
list.extend(index)
}
return JsonValue::from(list.join(""));
}
if add.is_empty()
&& del.is_empty()
&& unique.is_empty()
&& index.is_empty()
&& put.is_empty()
{
return JsonValue::from(-1);
}
let (state, _) = self.execute(sql.clone());
let data = match state {
true => {
let (state, _) = self.execute(sqls.clone());
match state {
true => {
let (state, _) = self.execute(drop_sql);
match state {
true => {
let (state, _) = self.execute(alter_sql);
match state {
true => {
if !unique.is_empty() {
let (state, _) = self.execute(unique.clone());
info!(
"{} {} 唯一索引创建:{}",
thread_id, options.table_name, state
);
}
for index_sql in index.iter() {
let (state, _) = self.execute(index_sql.clone());
match state {
true => {}
false => {
error!(
"{} 索引创建失败: {} {}",
options.table_name, state, index_sql
);
return JsonValue::from(0);
}
};
}
return JsonValue::from(1);
}
false => {
error!("{} 修改表名失败", options.table_name);
return JsonValue::from(0);
}
}
}
false => {
error!("{} 删除本表失败", options.table_name);
return JsonValue::from(0);
}
}
}
false => {
error!(
"{} 添加tmp表记录失败 {:#} {:#}",
options.table_name, sql, sqls
);
let sql = format!("drop table {}_tmp", options.table_name);
let (_, _) = self.execute(sql);
0
}
}
}
false => {
error!("{} 创建TMP表失败 {:#}", options.table_name, sql);
let (_, _) = self.execute(drop_sql_temp);
0
}
};
JsonValue::from(data)
}
fn table_info(&mut self, table: &str) -> JsonValue {
if let Ok(guard) = FIELDS.lock() {
if let Some(cached) = guard.get(table) {
return cached.clone();
}
}
let sql = format!("PRAGMA table_info({table})");
let (state, data) = self.query(sql);
match state {
true => {
let mut fields = object! {};
for item in data.members() {
if let Some(name) = item["name"].as_str() {
fields[name] = item.clone();
}
}
if let Ok(mut guard) = FIELDS.lock() {
guard.insert(table.to_string(), fields.clone());
}
data
}
false => object! {},
}
}
fn table_is_exist(&mut self, name: &str) -> bool {
let sql = format!(
"SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{name}'"
);
let (state, data) = self.query(sql);
if state && !data.is_empty() {
let count = data[0]["count"].as_i64().unwrap_or(0);
return count > 0;
}
false
}
fn table(&mut self, name: &str) -> &mut Sqlite {
self.params = Params::default(self.connection.mode.str().as_str());
let table_name = format!("{}{}", self.connection.prefix, name);
if !super::sql_safety::validate_table_name(&table_name) {
error!("Invalid table name: {}", name);
}
self.params.table = table_name.clone();
self.params.join_table = table_name;
self
}
fn change_table(&mut self, name: &str) -> &mut Self {
self.params.join_table = name.to_string();
self
}
fn autoinc(&mut self) -> &mut Self {
self.params.autoinc = true;
self
}
fn timestamps(&mut self) -> &mut Self {
self.params.timestamps = true;
self
}
fn fetch_sql(&mut self) -> &mut Self {
self.params.sql = true;
self
}
fn order(&mut self, field: &str, by: bool) -> &mut Self {
self.params.order[field] = {
if by {
"DESC"
} else {
"ASC"
}
}
.into();
self
}
fn group(&mut self, field: &str) -> &mut Self {
let fields: Vec<&str> = field.split(",").collect();
for field in fields.iter() {
let fields = field.to_string();
self.params.group[fields.as_str()] = fields.clone().into();
self.params.fields[fields.as_str()] = fields.clone().into();
}
self
}
fn distinct(&mut self) -> &mut Self {
self.params.distinct = true;
self
}
fn json(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.json[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn location(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.location[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn field(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
for item in list.iter() {
let item = item.to_string();
if item.contains(" as ") {
let text = item.split(" as ").collect::<Vec<&str>>().clone();
self.params.fields[item] =
format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
} else {
self.params.fields[item] = format!("{join_table}.`{item}`").into();
}
}
self
}
fn field_raw(&mut self, expr: &str) -> &mut Self {
self.params.fields[expr] = expr.into();
self
}
fn hidden(&mut self, name: &str) -> &mut Self {
let hidden: Vec<&str> = name.split(",").collect();
let sql = format!("PRAGMA table_info({})", self.params.table);
let (_, data) = self.query(sql);
for item in data.members() {
if let Some(name) = item["name"].as_str() {
if !hidden.contains(&name) {
self.params.fields[name] = name.into();
}
}
}
self
}
fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
if value.as_bool().unwrap_or(false) {
value = 1.into();
} else {
value = 0.into();
}
}
match compare {
"between" => {
self.params.where_and.push(format!(
"{}.`{}` between '{}' AND '{}'",
join_table, field, value[0], value[1]
));
}
"set" => {
let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
let mut wheredata = vec![];
for item in list.iter() {
wheredata.push(format!("{join_table}.`{field}` like '%{item}%'"));
}
self.params
.where_and
.push(format!("({})", wheredata.join(" or ")));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.`{field}` not in ({text})"));
}
"in" => {
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else {
let value = value.to_string();
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} ({text})"));
}
"=" => {
if value.is_null() {
self.params
.where_and
.push(format!("{}.`{}` {} {}", join_table, field, "IS", value));
} else {
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} '{value}'"));
}
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_and.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts
.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
}
self.params
.where_and
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params
.where_and
.push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
}
}
_ => {
if value.is_null() {
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} {value}"));
} else {
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} '{value}'"));
}
}
}
self
}
fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
if value.as_bool().unwrap_or(false) {
value = 1.into();
} else {
value = 0.into();
}
}
match compare {
"between" => {
self.params.where_or.push(format!(
"{}.`{}` between '{}' AND '{}'",
join_table, field, value[0], value[1]
));
}
"set" => {
let tt = value.to_string().replace(",", "%");
self.params
.where_or
.push(format!("{join_table}.`{field}` like '%{tt}%'"));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.`{field}` not in ({text})"));
}
"in" => {
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else {
let value = value.as_str().unwrap_or("");
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.`{field}` {compare} ({text})"));
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_or.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts
.push(format!("({join_table}.`{field}` LIKE '%\"{}\"%')", escaped));
}
self.params
.where_or
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params
.where_or
.push(format!("{join_table}.`{field}` LIKE '%\"{}\"%'", escaped));
}
}
_ => {
self.params
.where_or
.push(format!("{join_table}.`{field}` {compare} '{value}'"));
}
}
self
}
fn where_raw(&mut self, expr: &str) -> &mut Self {
self.params.where_and.push(expr.to_string());
self
}
fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("`{field}` IN ({sub_sql})"));
self
}
fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("`{field}` NOT IN ({sub_sql})"));
self
}
fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params.where_and.push(format!("EXISTS ({sub_sql})"));
self
}
fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("NOT EXISTS ({sub_sql})"));
self
}
fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
self.params.where_column = format!(
"{}.`{}` {} {}.`{}`",
self.params.table, field_a, compare, self.params.table, field_b
);
self
}
fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
self.params
.update_column
.push(format!("{field_a} = {compare}"));
self
}
fn page(&mut self, page: i32, limit: i32) -> &mut Self {
self.params.page = page;
self.params.limit = limit;
self
}
fn limit(&mut self, count: i32) -> &mut Self {
self.params.limit_only = count;
self
}
fn column(&mut self, field: &str) -> JsonValue {
self.field(field);
self.group(field);
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
self.table_info(self.params.table.clone().as_str());
let (state, data) = self.query(sql);
if state {
let mut list = array![];
for item in data.members() {
if self.params.json[field].is_empty() {
let _ = list.push(item[field].clone());
} else {
let data =
json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
let _ = list.push(data);
}
}
list
} else {
array![]
}
}
fn count(&mut self) -> JsonValue {
self.params.fields["count"] = "count(*) as count".to_string().into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
match state {
true => data[0]["count"].clone(),
false => JsonValue::from(0),
}
}
fn max(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("max({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0.0)
}
}
fn min(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("min({field}) as {field}").into();
let sql = self.params.select_sql();
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0.0)
}
}
fn sum(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("sum({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
match state {
true => {
if data.len() > 1 {
return data;
}
if self.params.fields.len() > 1 {
return data[0].clone();
}
data[0][field].clone()
}
false => JsonValue::from(0),
}
}
fn avg(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("avg({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn having(&mut self, expr: &str) -> &mut Self {
self.params.having.push(expr.to_string());
self
}
fn select(&mut self) -> JsonValue {
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
self.table_info(self.params.table.clone().as_str());
let (state, mut data) = self.query(sql.clone());
match state {
true => {
for (field, _) in self.params.json.entries() {
for item in data.members_mut() {
if !item[field].is_empty() {
let json = item[field].to_string();
item[field] = match json::parse(&json) {
Ok(e) => e,
Err(_) => JsonValue::from(json),
};
}
}
}
data.clone()
}
false => {
error!("{data:?}");
array![]
}
}
}
fn find(&mut self) -> JsonValue {
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
self.table_info(self.params.table.clone().as_str());
let (state, mut data) = self.query(sql.clone());
match state {
true => {
if data.is_empty() {
return object! {};
}
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0].clone()
}
false => {
error!("{data:?}");
object! {}
}
}
}
fn value(&mut self, field: &str) -> JsonValue {
self.params.fields = object! {};
self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
self.table_info(self.params.table.clone().as_str());
let (state, mut data) = self.query(sql.clone());
match state {
true => {
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0][field].clone()
}
false => {
if self.connection.debug {
info!("{data:?}");
}
JsonValue::Null
}
}
}
fn insert(&mut self, mut data: JsonValue) -> JsonValue {
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("`{field}`"));
if value.is_string() {
if value.to_string().contains("'") {
values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
continue;
} else if value.to_string().contains('"') {
values.push(format!("'{value}'"));
continue;
} else {
values.push(format!("\"{value}\""));
continue;
}
} else if value.is_array() || value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!("'{value}'"));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_number() || value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{value}'"));
continue;
}
}
let fields = fields.join(",");
let values = values.join(",");
let sql = format!(
"INSERT INTO `{}` ({}) VALUES ({});",
self.params.table, fields, values
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, ids) = self.execute(sql);
match state {
true => {
if self.params.autoinc {
let (state, ids) =
self.query(format!("select max(id) as id from {}", self.params.table));
return match state {
true => ids[0]["id"].clone(),
false => {
error!("{ids}");
JsonValue::from("")
}
};
}
data["id"].clone()
}
false => {
error!("{ids}");
JsonValue::from("")
}
}
}
fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
let mut fields = String::new();
if !self.params.autoinc && data[0]["id"].is_empty() {
data[0]["id"] = "".into();
}
for (field, _) in data[0].entries() {
fields = format!("{fields},`{field}`");
}
fields = fields.trim_start_matches(",").to_string();
let core_count = num_cpus::get();
let mut p = pools::Pool::new(core_count * 4);
let autoinc = self.params.autoinc;
for list in data.members() {
let mut item = list.clone();
p.execute(move |pcindex| {
if !autoinc && item["id"].is_empty() {
let id = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
pcindex
);
item["id"] = id.into();
}
let mut values = "".to_string();
for (_, value) in item.entries() {
if value.is_string() {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
} else if value.is_number() || value.is_boolean() {
values = format!("{values},{value}");
} else {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
}
}
values = format!("({})", values.trim_start_matches(","));
array![item["id"].clone(), values]
});
}
let (ids_list, mut values) = p.insert_all();
values = values.trim_start_matches(",").to_string();
let sql = format!(
"INSERT INTO {} ({}) VALUES {};",
self.params.table, fields, values
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.clone());
match state {
true => {
if self.params.autoinc {
let (state, ids) = self.query(format!(
"SELECT id FROM {} GROUP BY id ORDER BY id DESC LIMIT {} OFFSET 0",
self.params.table,
ids_list.len()
));
return match state {
true => {
let mut idlist = array![];
for item in ids.members() {
let _ = idlist.push(item["id"].clone());
}
idlist
}
false => {
error!("批量添加失败: {ids:?} {sql}");
array![]
}
};
}
JsonValue::from(ids_list)
}
false => {
error!("批量添加失败: {data:?} {sql}");
array![]
}
}
}
fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("`{field}`"));
if value.is_string() {
if value.to_string().contains("'") {
values.push(format!("\"{}\"", value.to_string().replace("'", "''")));
continue;
} else if value.to_string().contains('"') {
values.push(format!("'{value}'"));
continue;
} else {
values.push(format!("\"{value}\""));
continue;
}
} else if value.is_array() || value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!("'{value}'"));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("'{json}'"));
}
continue;
} else if value.is_number() || value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{value}'"));
continue;
}
}
let conflict_cols: Vec<String> =
conflict_fields.iter().map(|f| format!("`{}`", f)).collect();
let update_set: Vec<String> = fields
.iter()
.filter(|f| {
let name = f.trim_matches('`');
!conflict_fields.contains(&name) && name != "id"
})
.map(|f| format!("{f}=excluded.{f}"))
.collect();
let fields_str = fields.join(",");
let values_str = values.join(",");
let sql = format!(
"INSERT INTO `{}` ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
self.params.table,
fields_str,
values_str,
conflict_cols.join(","),
update_set.join(",")
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, result) = self.execute(sql);
match state {
true => {
if self.params.autoinc {
let (state, ids) =
self.query(format!("select max(id) as id from {}", self.params.table));
return match state {
true => ids[0]["id"].clone(),
false => {
error!("{ids}");
JsonValue::from("")
}
};
}
data["id"].clone()
}
false => {
error!("upsert失败: {result}");
JsonValue::from("")
}
}
}
fn update(&mut self, data: JsonValue) -> JsonValue {
let mut values = vec![];
for (field, value) in data.entries() {
if value.is_string() {
values.push(format!(
"`{}` = '{}'",
field,
value.to_string().replace("'", "''")
));
} else if value.is_array() || value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!("`{field}` = '{value}'"));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("`{field}` = '{json}'"));
}
continue;
} else if value.is_number() || value.is_boolean() || value.is_null() {
values.push(format!("`{field}` = {value} "));
} else {
values.push(format!("`{field}` = '{value}' "));
}
}
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("{} = {}", field, value.to_string().clone()));
}
let values = values.join(",");
let sql = format!(
"UPDATE `{}` SET {} {} {};",
self.params.table.clone(),
values,
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql);
if state {
data
} else {
error!("{data}");
JsonValue::from(0)
}
}
fn update_all(&mut self, data: JsonValue) -> JsonValue {
let mut values = vec![];
let mut ids = vec![];
for (field, _) in data[0].entries() {
if field == "id" {
continue;
}
let mut fields = vec![];
for row in data.members() {
let value = row[field].clone();
let id = row["id"].clone();
ids.push(id.clone());
if value.is_string() {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
} else if value.is_array() || value.is_object() {
if self.params.json[field].is_empty() {
fields.push(format!("WHEN '{id}' THEN '{value}'"));
} else {
let json = value.to_string();
let json = json.replace("'", "''");
fields.push(format!("WHEN '{id}' THEN '{json}'"));
}
continue;
} else if value.is_number() || value.is_boolean() || value.is_null() {
fields.push(format!("WHEN '{id}' THEN {value}"));
} else {
fields.push(format!("WHEN '{id}' THEN '{value}'"));
}
}
values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
}
self.where_and("id", "in", ids.into());
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("{} = {}", field, value.to_string().clone()));
}
let values = values.join(",");
let sql = format!(
"UPDATE {} SET {} {} {};",
self.params.table.clone(),
values,
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql);
if state {
data
} else {
error!("{data:?}");
JsonValue::from(0)
}
}
fn delete(&mut self) -> JsonValue {
let sql = format!(
"delete FROM `{}` {};",
self.params.table.clone(),
self.params.where_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql);
match state {
true => data,
false => {
error!("delete 失败>>>{data:?}");
JsonValue::from(0)
}
}
}
fn transaction(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
if SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
SQLITE_TRANSACTION_MANAGER.increment_depth(&thread_id);
let sp = format!("SAVEPOINT sp_{}", depth + 1);
let _ = self.query(sp);
return true;
}
if !SQLITE_TRANSACTION_MANAGER.acquire_write_lock(&thread_id, Duration::from_secs(30)) {
error!("{thread_id} 启动事务失败: 获取写锁超时");
return false;
}
let flags = OpenFlags::new().with_read_write().with_no_mutex();
let db = match Connect::open_thread_safe_with_flags(
self.connection.clone().get_dsn().as_str(),
flags,
) {
Ok(e) => Arc::new(e),
Err(e) => {
error!("{thread_id} 启动事务失败: 打开数据库失败 {e}");
SQLITE_TRANSACTION_MANAGER.release_write_lock(&thread_id);
return false;
}
};
SQLITE_TRANSACTION_MANAGER.start(&thread_id, db);
let (state, data) = self.query("BEGIN".to_string());
if state {
true
} else {
error!("{thread_id} 启动事务失败: {data}");
SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
false
}
}
fn commit(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
error!("{thread_id} 提交事务失败: 没有活跃的事务");
return false;
}
let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
if depth > 1 {
let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
let _ = self.query(sp);
SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
return true;
}
let (state, _) = self.query("COMMIT".to_string());
SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
if state {
true
} else {
error!("{thread_id} 提交事务失败");
false
}
}
fn rollback(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
if !SQLITE_TRANSACTION_MANAGER.is_in_transaction(&thread_id) {
error!("{thread_id} 回滚失败: 没有活跃的事务");
return false;
}
let depth = SQLITE_TRANSACTION_MANAGER.get_depth(&thread_id);
if depth > 1 {
let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
let _ = self.query(sp);
SQLITE_TRANSACTION_MANAGER.decrement_or_finish(&thread_id, &thread_id);
return true;
}
let (state, _) = self.query("ROLLBACK".to_string());
SQLITE_TRANSACTION_MANAGER.remove(&thread_id, &thread_id);
if state {
true
} else {
error!("回滚失败: {thread_id}");
false
}
}
fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
let (state, data) = self.query(sql.to_string());
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
let (state, data) = self.execute(sql.to_string());
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn inc(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("`{field}` + {num}").into();
self
}
fn dec(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("`{field}` - {num}").into();
self
}
fn buildsql(&mut self) -> String {
self.fetch_sql();
let sql = self.select().to_string();
format!("( {} ) `{}`", sql, self.params.table)
}
fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
for field in fields {
self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
}
self
}
fn join(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
let main_fields = if main_fields.is_empty() {
"id"
} else {
main_fields
};
let second_fields = if second_fields.is_empty() {
self.params.table.clone()
} else {
second_fields.to_string().clone()
};
let sec_table_name = format!("{}{}", table, "_2");
let second_table = format!("{} {}", table, sec_table_name.clone());
self.params.join_table = sec_table_name.clone();
self.params.join.push(format!(
" INNER JOIN {} ON {}.{} = {}.{}",
second_table, self.params.table, main_fields, sec_table_name, second_fields
));
self
}
fn join_right(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_full(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn union(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION {sub_sql}"));
self
}
fn union_all(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION ALL {sub_sql}"));
self
}
fn lock_for_update(&mut self) -> &mut Self {
self.params.lock_mode = "FOR UPDATE".to_string();
self
}
fn lock_for_share(&mut self) -> &mut Self {
self.params.lock_mode = "FOR SHARE".to_string();
self
}
}