use std::future::Future;
use std::sync::Arc;
use sqlw::{FromRow, Query, RowCell, RowError, RowLike, Value};
use crate::error::DBError;
pub struct TursoRowRef<'a> {
row: &'a turso::Row,
column_names: &'a [String],
}
impl<'a> TursoRowRef<'a> {
pub fn new(row: &'a turso::Row, column_names: &'a [String]) -> Self {
Self { row, column_names }
}
}
impl<'a> RowLike for TursoRowRef<'a> {
fn cell<'b>(&'b self, name: &str) -> Result<RowCell<'b>, RowError> {
let index = self
.column_names
.iter()
.position(|col| col == name)
.ok_or_else(|| RowError::ColumnNotFound { name: name.into() })?;
let turso_value = self
.row
.get_value(index)
.map_err(|e| RowError::Any(e.to_string()))?;
let value = match turso_value {
turso::Value::Null => Value::Null,
turso::Value::Integer(i) => Value::Int(i),
turso::Value::Real(f) => Value::Float(f),
turso::Value::Text(s) => Value::Text(s),
turso::Value::Blob(b) => Value::Blob(b),
};
Ok(RowCell::Owned(value))
}
}
#[derive(Debug)]
pub struct TursoExecutor {
connection: Arc<::turso::Connection>,
}
#[derive(Debug)]
pub struct TursoTransaction<'conn> {
tx: ::turso::transaction::Transaction<'conn>,
}
impl TursoExecutor {
pub async fn new<F, Fut>(connector: F) -> Result<Self, DBError>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<::turso::Connection, ::turso::Error>> + Send + 'static,
{
let connection = connector()
.await
.map_err(|e| DBError::Connection(e.into()))?;
Ok(TursoExecutor {
connection: Arc::new(connection),
})
}
pub fn conn(&self) -> &::turso::Connection {
&self.connection
}
pub fn transaction(&self) -> impl Future<Output = Result<TursoTransaction<'_>, DBError>> {
async move {
let tx = self
.connection
.unchecked_transaction()
.await
.map_err(|e| DBError::Transaction(e.into()))?;
Ok(TursoTransaction { tx })
}
}
}
impl sqlw::QueryExecutor for TursoExecutor {
type Error = DBError;
fn query_void(&self, query: Query) -> impl Future<Output = Result<(), DBError>> {
let connection = Arc::clone(&self.connection);
async move { execute_query_void(connection.as_ref(), query).await }
}
fn query_one<T: FromRow + Send + 'static>(
&self,
query: Query,
) -> impl Future<Output = Result<Option<T>, DBError>> {
let connection = Arc::clone(&self.connection);
async move { execute_query_one(connection.as_ref(), query).await }
}
fn query_list<T: FromRow + Send + 'static>(
&self,
query: Query,
) -> impl Future<Output = Result<Vec<T>, DBError>> {
let connection = Arc::clone(&self.connection);
async move { execute_query_list(connection.as_ref(), query).await }
}
fn batch(&self, scripts: &[fn() -> Query]) -> impl Future<Output = Result<(), DBError>> {
async move {
let tx = self.transaction().await?;
for script in scripts {
if let Err(err) = tx.query_void(script()).await {
let _ = tx.rollback().await;
return Err(err);
}
}
tx.commit().await
}
}
}
impl TursoTransaction<'_> {
pub fn raw(&self) -> &::turso::transaction::Transaction<'_> {
&self.tx
}
pub async fn query_void(&self, query: Query) -> Result<(), DBError> {
execute_query_void(&self.tx, query).await
}
pub async fn query_one<T: FromRow + Send + 'static>(
&self,
query: Query,
) -> Result<Option<T>, DBError> {
execute_query_one(&self.tx, query).await
}
pub async fn query_list<T: FromRow + Send + 'static>(
&self,
query: Query,
) -> Result<Vec<T>, DBError> {
execute_query_list(&self.tx, query).await
}
pub async fn commit(self) -> Result<(), DBError> {
self.tx
.commit()
.await
.map_err(|e| DBError::Transaction(e.into()))
}
pub async fn rollback(self) -> Result<(), DBError> {
self.tx
.rollback()
.await
.map_err(|e| DBError::Transaction(e.into()))
}
}
async fn execute_query_void(connection: &::turso::Connection, query: Query) -> Result<(), DBError> {
let (sql, args) = query.split();
let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
connection
.execute(&sql, args)
.await
.map(|_| ())
.map_err(|e| DBError::Execution(e.into()))
}
async fn execute_query_one<T: FromRow + Send + 'static>(
connection: &::turso::Connection,
query: Query,
) -> Result<Option<T>, DBError> {
let (sql, args) = query.split();
let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
let mut stmt = connection
.prepare(&sql)
.await
.map_err(|e| DBError::Execution(e.into()))?;
let columns: Vec<String> = stmt
.columns()
.iter()
.map(|col| col.name().to_string())
.collect();
let mut rows = stmt
.query(args)
.await
.map_err(|e| DBError::Execution(e.into()))?;
if let Some(row) = rows
.next()
.await
.map_err(|e| DBError::Execution(e.into()))?
{
let row_ref = TursoRowRef::new(&row, &columns);
Ok(Some(T::from_row(&row_ref)?))
} else {
Ok(None)
}
}
async fn execute_query_list<T: FromRow + Send + 'static>(
connection: &::turso::Connection,
query: Query,
) -> Result<Vec<T>, DBError> {
let (sql, args) = query.split();
let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
let mut stmt = connection
.prepare(&sql)
.await
.map_err(|e| DBError::Execution(e.into()))?;
let columns: Vec<String> = stmt
.columns()
.iter()
.map(|col| col.name().to_string())
.collect();
let mut rows = stmt
.query(args)
.await
.map_err(|e| DBError::Execution(e.into()))?;
let mut results = Vec::new();
while let Some(row) = rows
.next()
.await
.map_err(|e| DBError::Execution(e.into()))?
{
let row_ref = TursoRowRef::new(&row, &columns);
results.push(T::from_row(&row_ref)?);
}
Ok(results)
}
fn sqlw_to_turso_value(value: Value) -> ::turso::Value {
match value {
Value::Text(s) => ::turso::Value::Text(s),
Value::Int(i) => ::turso::Value::Integer(i),
Value::Float(f) => ::turso::Value::Real(f),
Value::Bool(b) => ::turso::Value::Integer(i64::from(b)),
Value::Blob(b) => ::turso::Value::Blob(b),
Value::Null => ::turso::Value::Null,
}
}