use crate::{
ast::ParseSQL,
builder::{
UpdateOrInsert, merged_conditions, DeleteQuery, InsertItems, InsertQurey, SelectQuery, UpdateItem, UpdateQuery,
delete_build, insert_build, select_build, select_build_two, update_build, update_build_res,
},
sqlite_pool as pool,
};
use crate::common::{ColumnBaseInfo, OthersRes, SelectRes, SqlRespone, BuildConditionItem, DatabaseKind};
use crate::to_json::sqlite::to_json as to_json;
use serde::Serialize;
use serde_json::{json, Value as JsonValue};
use sqlx::{sqlite::SqliteRow, query, query_as, Column, Executor, FromRow, QueryBuilder, Row, Sqlite, Transaction, TypeInfo};
use std::path::Path;
pub async fn raw_queries(
sqls: Vec<String>,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<SqlRespone> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data = SqlRespone::default();
if return_sql {
data.sql = Some(sqls.clone().join(", "));
}
let parse_sqls: Vec<(String, ParseSQL)> = sqls
.iter()
.filter_map(|s| {
let parse: Option<ParseSQL> = ParseSQL::parse(s, DatabaseKind::Sqlite).ok();
parse.map(|p| (s.to_string(), p))
})
.collect();
let (allow_in_txns, forbidden_in_txns): (Vec<_>, Vec<_>) = parse_sqls
.iter()
.partition(|(_, parsed)| !parsed.forbidden_in_txn);
for (sql, parse) in forbidden_in_txns {
if !parse.returns_result {
let rows_affected = query(sql).execute(&pool).await?.rows_affected();
data.rows_affected += rows_affected;
data.is_query = false;
} else {
let rows = query(sql).fetch_all(&pool).await?;
let (res, columns) = to_json(rows)?;
let length = res.len() as u64;
data.results = Some(res);
data.columns = Some(columns);
data.length = length;
data.is_query = true;
data.table_name = parse.table_name.clone();
if length > 0 {
if let Some(count_sql) = ParseSQL::to_count_sql(sql, DatabaseKind::Sqlite)? {
let (count,): (i64,) = sqlx::query_as(&count_sql).fetch_one(&pool).await?;
data.count = count as u64;
} else {
data.count = length;
}
}
}
}
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
for (sql, parse) in allow_in_txns {
if !parse.returns_result {
let rows_affected = query(sql).execute(&mut *tx).await?.rows_affected();
data.rows_affected += rows_affected;
data.is_query = false;
} else {
let rows = query(sql).fetch_all(&mut *tx).await?;
let (res, columns) = to_json(rows)?;
let length = res.len() as u64;
data.results = Some(res);
data.columns = Some(columns);
data.length = length;
data.is_query = true;
data.table_name = parse.table_name.clone();
if length > 0 {
if let Some(count_sql) = ParseSQL::to_count_sql(sql, DatabaseKind::Sqlite)? {
let (count,): (i64,) = sqlx::query_as(&count_sql).fetch_one(&mut *tx).await?;
data.count = count as u64;
} else {
data.count = length;
}
}
}
}
tx.commit().await?;
println!("tx.commit().await");
Ok(data)
}
pub async fn raw_fetch(
sql: &str,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<SelectRes<JsonValue>> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data = SelectRes::<JsonValue>::default();
if return_sql {
data.sql = Some(sql.to_string());
}
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
let first_row_opt: Option<SqliteRow> = query(sql).fetch_optional(&mut *tx).await?;
let columns: Vec<ColumnBaseInfo> = if let Some(ref row) = first_row_opt {
columns_info(row)?
} else {
return Ok(data);
};
let rows: Vec<SqliteRow> = query(sql).fetch_all(&mut *tx).await?;
let (results, _) = to_json(rows)?;
let length = results.len();
data.results = results;
data.length = length;
data.columns = Some(columns);
if length > 0 {
if let Some(count_sql) = ParseSQL::to_count_sql(sql, DatabaseKind::Sqlite)? {
let (count,): (i64,) = sqlx::query_as(&count_sql).fetch_one(&mut *tx).await?;
data.count = count as usize;
} else {
data.count = length as usize;
}
}
tx.commit().await?;
Ok(data)
}
pub async fn raw_fetch2(pool: &sqlx::Pool<Sqlite>, sql: &str) -> anyhow::Result<JsonValue> {
let first_row_opt: Option<SqliteRow> = query(sql).fetch_optional(pool).await?;
let columns = if let Some(ref row) = first_row_opt {
columns_info(row)?
} else {
vec![]
};
let rows: Vec<SqliteRow> = query(sql).fetch_all(pool).await?;
let (results, _) = to_json(rows)?;
let length = results.len();
let mut total_count = 0;
if length > 0 {
if let Some(count_sql) = ParseSQL::to_count_sql(sql, DatabaseKind::Sqlite)? {
let (count,): (i64,) = sqlx::query_as(&count_sql).fetch_one(pool).await?;
total_count = count as u64;
} else {
total_count = length as u64;
}
}
let output = json!({
"columns": columns,
"results": results,
"count": total_count,
"is_query": true
});
Ok(output)
}
pub async fn raw_execute_batch(
sqls: &Vec<String>,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<JsonValue> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data: JsonValue = json!({
"rows_affected": 0,
});
if return_sql {
data["sql"] = json!(sqls.clone().join(","));
}
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
let mut affected_rows_count = 0;
for sql in sqls {
let rows_affected = query(sql).execute(&mut *tx).await?.rows_affected();
affected_rows_count += rows_affected;
}
tx.commit().await?;
data["rows_affected"] = JsonValue::Number(affected_rows_count.into());
Ok(data)
}
pub async fn raw_exists(sql: &str, database_path: impl AsRef<Path>) -> anyhow::Result<bool> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let sql = format!("SELECT EXISTS({})", sql.trim_end_matches(';'));
let (exists,): (bool,) = sqlx::query_as(&sql).fetch_one(&pool).await?;
Ok(exists)
}
pub async fn raw_execute(
sql: &str,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<JsonValue> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data: JsonValue = json!({
"length": 0,
"rows_affected": 0,
"results": [],
"columns":[]
});
if return_sql {
data["sql"] = json!(vec![sql]);
}
let is_query = ParseSQL::parse(sql, DatabaseKind::Sqlite)?;
if !is_query.returns_result {
let rows_affected = query(sql).execute(&pool).await?.rows_affected();
data["rows_affected"] = JsonValue::Number(rows_affected.into());
data["is_query"] = JsonValue::Bool(false);
} else {
let rows: Vec<SqliteRow> = query(sql).fetch_all(&pool).await?;
let (res, _) = to_json(rows)?;
data["results"] = json!(res);
}
Ok(data)
}
pub async fn raw_affect(
sql: &str,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<JsonValue> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data: JsonValue = json!({
"length": 0,
"rows_affected": 0,
"results": [],
"columns":[]
});
if return_sql {
data["sql"] = json!(vec![sql]);
}
let rows_affected = query(sql).execute(&pool).await?.rows_affected();
data["is_query"] = JsonValue::Bool(false);
data["rows_affected"] = JsonValue::Number(rows_affected.into());
Ok(data)
}
pub async fn raw_direct(sql: &str, database_path: impl AsRef<Path>) -> anyhow::Result<()> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut conn = pool.acquire().await?;
conn.execute(sql).await?;
Ok(())
}
pub async fn raw_fetch_as<T>(sql: &str, database_path: impl AsRef<Path>) -> anyhow::Result<Vec<T>>
where
T: std::fmt::Debug + Serialize + Send + for<'r> FromRow<'r, SqliteRow> + Unpin + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let data: Vec<T> = query_as(sql).fetch_all(&pool).await?;
Ok(data)
}
pub async fn raw_fetch_one_as<T>(sql: &str, database_path: impl AsRef<Path>) -> anyhow::Result<T>
where
T: std::fmt::Debug + Serialize + Send + for<'r> FromRow<'r, SqliteRow> + Unpin + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let data: T = query_as(sql).fetch_one(&pool).await?;
Ok(data)
}
pub async fn builder_fetch(
mut query_builder: QueryBuilder<'_, Sqlite>,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<SelectRes<JsonValue>> {
let mut data = SelectRes::<JsonValue>::default();
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
if return_sql {
let sql = query_builder.sql().to_string();
data.sql = Some(sql);
}
let builder = query_builder.build();
let rows = builder.fetch_all(&pool).await?;
let (res, columns) = to_json(rows)?;
data.length = res.len();
data.results = res;
data.columns = Some(columns);
Ok(data)
}
pub async fn builder_fetch_as<T>(
mut query_builder: QueryBuilder<'_, Sqlite>,
database_path: impl AsRef<Path>,
) -> anyhow::Result<Vec<T>>
where
T: std::fmt::Debug + Serialize + Send + Unpin + for<'r> FromRow<'r, SqliteRow> + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let built = query_builder.build_query_as();
let data: Vec<T> = built.fetch_all(&pool).await?;
Ok(data)
}
pub async fn builder_fetch_one_as<T>(
mut query_builder: QueryBuilder<'_, Sqlite>,
database_path: impl AsRef<Path>,
) -> anyhow::Result<T>
where
T: std::fmt::Debug + Serialize + Send + Unpin + for<'r> FromRow<'r, SqliteRow> + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let built = query_builder.build_query_as();
let data: T = built.fetch_one(&pool).await?;
Ok(data)
}
pub async fn builder_returning<T>(
mut query_builder: QueryBuilder<'_, Sqlite>,
database_path: impl AsRef<Path>,
) -> anyhow::Result<T>
where
T: std::fmt::Debug + Serialize + Send + Unpin + for<'r> FromRow<'r, SqliteRow> + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
query_builder.push(" RETURNING *");
let built = query_builder.build_query_as();
let data: T = built.fetch_one(&pool).await?;
Ok(data)
}
pub async fn builder_execute(
mut query_builder: QueryBuilder<'_, Sqlite>,
database_path: impl AsRef<Path>,
return_sql: bool,
) -> anyhow::Result<JsonValue> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut data = json!({
"rows_affected": 0,
});
if return_sql {
let sql = query_builder.sql();
data["sql"] = json!(sql);
}
let built = query_builder.build();
let rows_affected = built.execute(&pool).await?.rows_affected();
data["rows_affected"] = json!(rows_affected);
Ok(data)
}
pub async fn builder_transaction(
mut query_builders: Vec<QueryBuilder<'_, Sqlite>>,
database_path: impl AsRef<Path>,
_return_sql: bool,
) -> anyhow::Result<u64> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut affected_rows_count: u64 = 0;
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
for builder in query_builders.iter_mut() {
let built = builder.build();
let rows_affected: u64 = built.execute(&mut *tx).await?.rows_affected();
affected_rows_count += rows_affected;
}
tx.commit().await?;
Ok(affected_rows_count)
}
pub async fn upsert<T>(
data: T,
wheres: Option<Vec<BuildConditionItem>>,
database_path: impl AsRef<Path>,
table_name: &str,
ignore_update_columns: Option<Vec<String>>,
) -> anyhow::Result<UpdateOrInsert<T>>
where
T: std::fmt::Debug + Clone + Serialize + Send + Unpin + for<'r> FromRow<'r, SqliteRow> + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut tx = pool.begin().await?;
let mut select_builder =
select_build::<sqlx::Sqlite>(table_name, SelectQuery::new(table_name, &wheres))?;
let select_query = select_builder.build_query_as::<T>();
let row = select_query.fetch_one(&mut *tx).await.ok();
let mut is_new = false;
let mut updated_rows = None;
if row.is_none() {
let insert_items = InsertItems::from_value_t(data.clone())?;
let mut insert_builder = insert_build::<sqlx::Sqlite>(table_name, insert_items)?;
let insert_query = insert_builder.build();
is_new = true;
insert_query.execute(&mut *tx).await?.rows_affected();
} else {
let update_items = UpdateItem::from_value_t(data.clone(), None)?;
let mut update_builder = update_build_res::<sqlx::Sqlite>(
table_name,
wheres,
update_items,
ignore_update_columns,
)?;
let update_query = update_builder.build_query_as::<T>();
let rows = update_query.fetch_all(&mut *tx).await?;
updated_rows = Some(rows)
}
tx.commit().await?;
Ok(UpdateOrInsert { is_update: !is_new, data: updated_rows })
}
pub async fn upsert_batch<T>(
data_list: Vec<T>,
wheres_builder: impl Fn(&T) -> Option<Vec<BuildConditionItem>> + Send + Sync,
database_path: impl AsRef<Path>,
table_name: &str,
ignore_update_columns: Option<Vec<String>>,
) -> anyhow::Result<(i32, Vec<JsonValue>)>
where
T: std::fmt::Debug + Clone + Serialize + Send + Unpin + for<'r> FromRow<'r, SqliteRow> + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut tx = pool.begin().await?;
let mut results = Vec::with_capacity(data_list.len());
let mut rows_affected = 0;
for data in data_list {
let wheres = wheres_builder(&data);
let mut select_builder =
select_build::<Sqlite>(table_name, SelectQuery::new(table_name, &wheres))?;
let select_query = select_builder.build();
let rows: Vec<SqliteRow> = select_query.fetch_all(&mut *tx).await?;
let is_create;
if rows.is_empty() {
let insert_items = InsertItems::from_value_t(data.clone())?;
let mut insert_builder = insert_build::<Sqlite>(table_name, insert_items)?;
let insert_query = insert_builder.build();
insert_query.execute(&mut *tx).await?;
is_create = true;
} else {
let update_items = UpdateItem::from_value_t(data.clone(), None)?;
let mut update_builder = update_build::<Sqlite>(
table_name,
wheres,
update_items,
ignore_update_columns.clone(),
)?;
let update_query = update_builder.build();
update_query.execute(&mut *tx).await?;
is_create = false;
}
rows_affected += 1;
results.push(json!({
"is_create": is_create,
"data": data
}));
}
tx.commit().await?;
Ok((rows_affected, results))
}
pub async fn select(
database_path: impl AsRef<Path>,
table_name: &str,
mut query_data: SelectQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
) -> anyhow::Result<SelectRes<JsonValue>> {
let mut data = SelectRes::<JsonValue>::default();
if add_conditions.is_some() {
query_data.wheres = merged_conditions(&query_data.wheres, &add_conditions);
}
let (mut select_build, mut count_build) = select_build_two::<Sqlite>(table_name, query_data)?;
let sql = select_build.sql().to_string();
let select_builder = select_build.build();
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
let rows = select_builder.fetch_all(&mut *tx).await?;
let length = rows.len();
if length > 0 {
let count_builder = count_build.build_query_as();
let row: (i64,) = count_builder.fetch_one(&mut *tx).await?;
let (json_data, columns) = to_json(rows)?;
data.results = json_data;
data.columns = Some(columns);
data.count = row.0 as usize;
data.sql = Some(sql);
data.table_name = Some(table_name.to_string())
}
tx.commit().await?;
Ok(data)
}
pub async fn select_json(
database_path: impl AsRef<Path>,
table_name: &str,
mut query_data: SelectQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
) -> anyhow::Result<SelectRes<JsonValue>> {
let mut data = SelectRes::<JsonValue>::default();
if add_conditions.is_some() {
query_data.wheres = merged_conditions(&query_data.wheres, &add_conditions);
}
let (mut select_build, mut count_build) = select_build_two::<Sqlite>(table_name, query_data)?;
let sql = select_build.sql().to_string();
let select_builder = select_build.build();
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
let rows = select_builder.fetch_all(&mut *tx).await?;
let length = rows.len();
if length > 0 {
let count_builder = count_build.build_query_as();
let row: (i64,) = count_builder.fetch_one(&mut *tx).await?;
let (json_data, columns) = to_json(rows)?;
data.results = json_data;
data.columns = Some(columns);
data.count = row.0 as usize;
data.sql = Some(sql);
data.table_name = Some(table_name.to_string())
}
tx.commit().await?;
Ok(data)
}
pub async fn update(
database_path: impl AsRef<Path>,
table_name: &str,
mut update_options: UpdateQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
ignore_columns: Option<Vec<String>>,
) -> anyhow::Result<OthersRes<JsonValue>> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
if add_conditions.is_some() {
update_options.wheres = merged_conditions(&update_options.wheres, &add_conditions);
}
let mut build = update_build::<Sqlite>(
table_name,
update_options.wheres,
update_options.data,
ignore_columns,
)?;
let builder = build.build();
let res: u64 = builder.execute(&pool).await?.rows_affected();
let res = OthersRes::new(res);
Ok(res)
}
pub async fn update_returning<T>(
database_path: impl AsRef<Path>,
table_name: &str,
mut update_options: UpdateQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
ignore_columns: Option<Vec<String>>,
) -> anyhow::Result<Vec<T>>
where
T: std::fmt::Debug + Serialize + Send + for<'r> FromRow<'r, SqliteRow> + Unpin + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
if add_conditions.is_some() {
update_options.wheres = merged_conditions(&update_options.wheres, &add_conditions);
}
let mut build = update_build::<Sqlite>(
table_name,
update_options.wheres,
update_options.data,
ignore_columns,
)?;
build.push(" RETURNING *");
let builder = build.build_query_as::<T>();
let res = builder.fetch_all(&pool).await?;
Ok(res)
}
pub async fn delete(
database_path: impl AsRef<Path>,
table_name: &str,
mut delete_options: DeleteQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
) -> anyhow::Result<OthersRes<JsonValue>> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
if add_conditions.is_some() {
delete_options.wheres = merged_conditions(&delete_options.wheres, &add_conditions);
}
let mut build = delete_build::<Sqlite>(table_name, delete_options.wheres)?;
let builder = build.build();
let res = builder.execute(&pool).await?.rows_affected();
let res = OthersRes::new(res);
Ok(res)
}
pub async fn delete_returning<T>(
database_path: impl AsRef<Path>,
table_name: &str,
mut delete_options: DeleteQuery,
add_conditions: Option<Vec<BuildConditionItem>>,
) -> anyhow::Result<Vec<T>>
where
T: std::fmt::Debug + Serialize + Send + for<'r> FromRow<'r, SqliteRow> + Unpin + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
if add_conditions.is_some() {
delete_options.wheres = merged_conditions(&delete_options.wheres, &add_conditions);
}
let mut build = delete_build::<Sqlite>(table_name, delete_options.wheres)?;
build.push(" RETURNING *");
let builder = build.build_query_as::<T>();
let res = builder.fetch_all(&pool).await?;
Ok(res)
}
pub async fn insert(
database_path: impl AsRef<Path>,
table_name: &str,
insert_options: InsertQurey,
) -> anyhow::Result<OthersRes<JsonValue>> {
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut build = insert_build::<Sqlite>(table_name, insert_options.data)?;
let builder = build.build();
let res = builder.execute(&pool).await?.rows_affected();
let res = OthersRes::new(res);
Ok(res)
}
pub async fn insert_returning<T>(
database_path: impl AsRef<Path>,
table_name: &str,
insert_options: InsertQurey,
) -> anyhow::Result<Vec<T>>
where
T: std::fmt::Debug + Serialize + Send + for<'r> FromRow<'r, SqliteRow> + Unpin + 'static,
{
let pool: sqlx::Pool<Sqlite> = pool(database_path).await?;
let mut build = insert_build::<Sqlite>(table_name, insert_options.data)?;
build.push(" RETURNING *");
let builder = build.build_query_as::<T>();
let res = builder.fetch_all(&pool).await?;
Ok(res)
}
pub fn columns_info(row: &SqliteRow) -> anyhow::Result<Vec<ColumnBaseInfo>> {
let columns = row.columns();
let mut new_columns = Vec::with_capacity(columns.len());
for col in columns {
let col_index = col.ordinal();
let col_name = col.name();
let type_info = col.type_info().name();
new_columns.push(ColumnBaseInfo {
name: col_name.to_string(),
r#type: type_info.to_string(),
index: col_index as u64,
});
}
Ok(new_columns)
}