use std::fmt::Write;
use async_trait::async_trait;
use fraiseql_error::{FraiseQLError, Result};
use sqlx::{
Column, Row,
sqlite::{SqlitePool, SqlitePoolOptions, SqliteRow},
};
use super::where_generator::SqliteWhereGenerator;
use crate::{
dialect::SqliteDialect,
identifier::quote_sqlite_identifier,
order_by::append_order_by,
traits::{DatabaseAdapter, DirectMutationContext, MutationStrategy},
types::{DatabaseType, JsonbValue, PoolMetrics, sql_hints::OrderByClause},
where_clause::WhereClause,
};
#[derive(Clone)]
pub struct SqliteAdapter {
pub(super) pool: SqlitePool,
}
impl SqliteAdapter {
pub async fn new(connection_string: &str) -> Result<Self> {
Self::with_pool_size(connection_string, 5).await
}
pub async fn with_pool_config(
connection_string: &str,
min_size: u32,
max_size: u32,
) -> Result<Self> {
let pool = SqlitePoolOptions::new()
.min_connections(min_size)
.max_connections(max_size)
.connect(connection_string)
.await
.map_err(|e| FraiseQLError::ConnectionPool {
message: format!("Failed to create SQLite connection pool: {e}"),
})?;
Ok(Self { pool })
}
pub async fn with_pool_size(connection_string: &str, max_size: u32) -> Result<Self> {
let pool = SqlitePoolOptions::new()
.max_connections(max_size)
.connect(connection_string)
.await
.map_err(|e| FraiseQLError::ConnectionPool {
message: format!("Failed to create SQLite connection pool: {e}"),
})?;
sqlx::query("SELECT 1")
.fetch_one(&pool)
.await
.map_err(|e| FraiseQLError::Database {
message: format!("Failed to connect to SQLite database: {e}"),
sql_state: None,
})?;
Ok(Self { pool })
}
pub async fn in_memory() -> Result<Self> {
Self::new("sqlite::memory:").await
}
async fn execute_raw(
&self,
sql: &str,
params: Vec<serde_json::Value>,
) -> Result<Vec<JsonbValue>> {
let mut query = sqlx::query(sql);
for param in ¶ms {
query = match param {
serde_json::Value::String(s) => query.bind(s.clone()),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
query.bind(i)
} else if let Some(f) = n.as_f64() {
query.bind(f)
} else {
query.bind(n.to_string())
}
},
serde_json::Value::Bool(b) => query.bind(*b),
serde_json::Value::Null => query.bind(Option::<String>::None),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
query.bind(param.to_string())
},
};
}
let rows: Vec<SqliteRow> =
query.fetch_all(&self.pool).await.map_err(|e| FraiseQLError::Database {
message: format!("SQLite query execution failed: {e}"),
sql_state: None,
})?;
let results = rows
.into_iter()
.map(|row| {
let data_str: String = row.try_get("data").unwrap_or_default();
let data: serde_json::Value =
serde_json::from_str(&data_str).unwrap_or(serde_json::Value::Null);
JsonbValue::new(data)
})
.collect();
Ok(results)
}
}
#[async_trait]
impl DatabaseAdapter for SqliteAdapter {
async fn execute_with_projection(
&self,
view: &str,
projection: Option<&crate::types::SqlProjectionHint>,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
if projection.is_none() {
return self.execute_where_query(view, where_clause, limit, offset, order_by).await;
}
let projection = projection.expect("projection is Some; None case returned above");
let mut sql = format!(
"SELECT {} FROM {}",
projection.projection_template,
quote_sqlite_identifier(view)
);
let params: Vec<serde_json::Value> = if let Some(clause) = where_clause {
let generator = super::where_generator::SqliteWhereGenerator::new(SqliteDialect);
let (where_sql, where_params) = generator.generate(clause)?;
sql.push_str(" WHERE ");
sql.push_str(&where_sql);
where_params
} else {
Vec::new()
};
append_order_by(&mut sql, order_by, DatabaseType::SQLite)?;
match (limit, offset) {
(Some(lim), Some(off)) => {
write!(sql, " LIMIT {lim} OFFSET {off}").expect("write to String");
},
(Some(lim), None) => {
write!(sql, " LIMIT {lim}").expect("write to String");
},
(None, Some(off)) => {
write!(sql, " LIMIT -1 OFFSET {off}").expect("write to String");
},
(None, None) => {},
}
self.execute_raw(&sql, params).await
}
async fn execute_where_query(
&self,
view: &str,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
let mut sql = format!("SELECT data FROM {}", quote_sqlite_identifier(view));
let mut params: Vec<serde_json::Value> = if let Some(clause) = where_clause {
let generator = SqliteWhereGenerator::new(SqliteDialect);
let (where_sql, where_params) = generator.generate(clause)?;
sql.push_str(" WHERE ");
sql.push_str(&where_sql);
where_params
} else {
Vec::new()
};
append_order_by(&mut sql, order_by, DatabaseType::SQLite)?;
match (limit, offset) {
(Some(lim), Some(off)) => {
sql.push_str(" LIMIT ? OFFSET ?");
params.push(serde_json::Value::Number(lim.into()));
params.push(serde_json::Value::Number(off.into()));
},
(Some(lim), None) => {
sql.push_str(" LIMIT ?");
params.push(serde_json::Value::Number(lim.into()));
},
(None, Some(off)) => {
sql.push_str(" LIMIT -1 OFFSET ?");
params.push(serde_json::Value::Number(off.into()));
},
(None, None) => {},
}
self.execute_raw(&sql, params).await
}
fn database_type(&self) -> DatabaseType {
DatabaseType::SQLite
}
fn supports_mutations(&self) -> bool {
true
}
fn mutation_strategy(&self) -> MutationStrategy {
MutationStrategy::DirectSql
}
async fn execute_direct_mutation(
&self,
ctx: &DirectMutationContext<'_>,
) -> Result<Vec<serde_json::Value>> {
let (sql, bind_values) = super::helpers::build_direct_mutation_sql(ctx)?;
let mut query = sqlx::query(&sql);
for val in &bind_values {
query = match val {
serde_json::Value::String(s) => query.bind(s.clone()),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
query.bind(i)
} else if let Some(f) = n.as_f64() {
query.bind(f)
} else {
query.bind(n.to_string())
}
},
serde_json::Value::Bool(b) => query.bind(*b),
serde_json::Value::Null => query.bind(Option::<String>::None),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
query.bind(val.to_string())
},
};
}
let rows: Vec<SqliteRow> =
query.fetch_all(&self.pool).await.map_err(|e| FraiseQLError::Database {
message: format!("SQLite direct mutation failed: {e}"),
sql_state: None,
})?;
if rows.is_empty() {
return Err(FraiseQLError::Validation {
message: format!(
"Direct mutation on '{}' affected no rows — \
the target row may not exist or RLS filters rejected it",
ctx.table
),
path: None,
});
}
let status = match ctx.operation {
crate::traits::DirectMutationOp::Insert => "new",
crate::traits::DirectMutationOp::Update => "updated",
crate::traits::DirectMutationOp::Delete => "deleted",
};
let mut results = Vec::with_capacity(rows.len());
for row in &rows {
let entity = super::helpers::sqlite_row_to_json(row);
let entity_id = match ctx.operation {
crate::traits::DirectMutationOp::Insert => None,
crate::traits::DirectMutationOp::Update
| crate::traits::DirectMutationOp::Delete => {
ctx.values.first().map(|v| v.to_string().trim_matches('"').to_string())
},
};
results.push(serde_json::json!({
"status": status,
"message": null,
"entity_id": entity_id,
"entity_type": ctx.return_type,
"entity": entity,
"updated_fields": null,
"cascade": null,
"metadata": null,
}));
}
Ok(results)
}
async fn health_check(&self) -> Result<()> {
sqlx::query("SELECT 1").fetch_one(&self.pool).await.map_err(|e| {
FraiseQLError::Database {
message: format!("SQLite health check failed: {e}"),
sql_state: None,
}
})?;
Ok(())
}
#[allow(clippy::cast_possible_truncation)] fn pool_metrics(&self) -> PoolMetrics {
let size = self.pool.size();
let idle = self.pool.num_idle();
PoolMetrics {
total_connections: size,
idle_connections: idle as u32,
active_connections: size - idle as u32,
waiting_requests: 0, }
}
async fn execute_raw_query(
&self,
sql: &str,
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
let rows: Vec<SqliteRow> =
sqlx::query(sql)
.fetch_all(&self.pool)
.await
.map_err(|e| FraiseQLError::Database {
message: format!("SQLite query execution failed: {e}"),
sql_state: None,
})?;
let results: Vec<std::collections::HashMap<String, serde_json::Value>> = rows
.into_iter()
.map(|row| {
let mut map = std::collections::HashMap::new();
for column in row.columns() {
let column_name = column.name().to_string();
let value: serde_json::Value =
if let Ok(v) = row.try_get::<i32, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<i64, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<f64, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<String, _>(column_name.as_str()) {
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&v) {
json_val
} else {
serde_json::json!(v)
}
} else if let Ok(v) = row.try_get::<bool, _>(column_name.as_str()) {
serde_json::json!(v)
} else {
serde_json::Value::Null
};
map.insert(column_name, value);
}
map
})
.collect();
Ok(results)
}
async fn execute_parameterized_aggregate(
&self,
sql: &str,
params: &[serde_json::Value],
) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
let mut query = sqlx::query(sql);
for param in params {
query = match param {
serde_json::Value::String(s) => query.bind(s.clone()),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
query.bind(i)
} else if let Some(f) = n.as_f64() {
query.bind(f)
} else {
query.bind(n.to_string())
}
},
serde_json::Value::Bool(b) => query.bind(*b),
serde_json::Value::Null => query.bind(Option::<String>::None),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
query.bind(param.to_string())
},
};
}
let rows: Vec<SqliteRow> =
query.fetch_all(&self.pool).await.map_err(|e| FraiseQLError::Database {
message: format!("SQLite parameterized aggregate query failed: {e}"),
sql_state: None,
})?;
let results = rows
.into_iter()
.map(|row| {
let mut map = std::collections::HashMap::new();
for column in row.columns() {
let column_name = column.name().to_string();
let value: serde_json::Value =
if let Ok(v) = row.try_get::<i32, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<i64, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<f64, _>(column_name.as_str()) {
serde_json::json!(v)
} else if let Ok(v) = row.try_get::<String, _>(column_name.as_str()) {
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&v) {
json_val
} else {
serde_json::json!(v)
}
} else if let Ok(v) = row.try_get::<bool, _>(column_name.as_str()) {
serde_json::json!(v)
} else {
serde_json::Value::Null
};
map.insert(column_name, value);
}
map
})
.collect();
Ok(results)
}
async fn explain_query(
&self,
sql: &str,
_params: &[serde_json::Value],
) -> Result<serde_json::Value> {
use sqlx::Row as _;
if sql.contains(';') {
return Err(FraiseQLError::Validation {
message: "EXPLAIN SQL must be a single statement".into(),
path: None,
});
}
let explain_sql = format!("EXPLAIN QUERY PLAN {sql}");
let rows: Vec<sqlx::sqlite::SqliteRow> = sqlx::query(&explain_sql)
.fetch_all(&self.pool)
.await
.map_err(|e| FraiseQLError::Database {
message: format!("SQLite EXPLAIN failed: {e}"),
sql_state: None,
})?;
let steps: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
let detail: String = row.try_get("detail").unwrap_or_default();
serde_json::json!({ "detail": detail })
})
.collect();
Ok(serde_json::json!(steps))
}
}
#[cfg(test)]
mod tests;