use std::pin::Pin;
use async_trait::async_trait;
use futures::Stream;
use crate::prepare::PreparedStatement;
use crate::schema::{ColumnInfo, ForeignKeyInfo, IndexInfo, TableInfo};
use crate::{OxiSqlError, Row, SqlWarning, Value};
pub trait ToSqlValue: Send + Sync {
fn to_value(&self) -> Value;
}
impl ToSqlValue for i64 {
fn to_value(&self) -> Value {
Value::I64(*self)
}
}
impl ToSqlValue for i32 {
fn to_value(&self) -> Value {
Value::I64(i64::from(*self))
}
}
impl ToSqlValue for f64 {
fn to_value(&self) -> Value {
Value::F64(*self)
}
}
impl ToSqlValue for str {
fn to_value(&self) -> Value {
Value::Text(self.to_string())
}
}
impl ToSqlValue for String {
fn to_value(&self) -> Value {
Value::Text(self.clone())
}
}
impl ToSqlValue for bool {
fn to_value(&self) -> Value {
Value::Bool(*self)
}
}
impl ToSqlValue for Vec<u8> {
fn to_value(&self) -> Value {
Value::Blob(self.clone())
}
}
impl<T: ToSqlValue> ToSqlValue for Option<T> {
fn to_value(&self) -> Value {
match self {
Some(v) => v.to_value(),
None => Value::Null,
}
}
}
impl<T: ToSqlValue + ?Sized> ToSqlValue for &T {
fn to_value(&self) -> Value {
(**self).to_value()
}
}
impl ToSqlValue for Value {
fn to_value(&self) -> Value {
self.clone()
}
}
#[async_trait]
pub trait Connection: Send + Sync {
async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError>;
async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError>;
async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
let mut total = 0u64;
for stmt in sql.split(';') {
let trimmed = stmt.trim();
if !trimmed.is_empty() {
total += self.execute(trimmed, &[]).await?;
}
}
Ok(total)
}
async fn ping(&self) -> Result<(), OxiSqlError> {
self.query("SELECT 1", &[]).await?;
Ok(())
}
async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
let _ = sql;
Err(OxiSqlError::Other(
"prepared statements are not supported by this backend".into(),
))
}
async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
Err(OxiSqlError::Other(
"schema introspection not supported by this backend".into(),
))
}
async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
let _ = table;
Err(OxiSqlError::Other(
"schema introspection not supported by this backend".into(),
))
}
async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
let _ = table;
Err(OxiSqlError::Other(
"schema introspection not supported by this backend".into(),
))
}
async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
let _ = table;
Err(OxiSqlError::Other(
"schema introspection not supported by this backend".into(),
))
}
async fn execute_named(
&self,
sql: &str,
params: &[(&str, &dyn ToSqlValue)],
) -> Result<u64, OxiSqlError> {
let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
let positional = crate::params::bind_named_params(&names, params)?;
self.execute(&rewritten, &positional).await
}
async fn query_named(
&self,
sql: &str,
params: &[(&str, &dyn ToSqlValue)],
) -> Result<Vec<Row>, OxiSqlError> {
let (rewritten, names) = crate::params::rewrite_named_params(sql)?;
let positional = crate::params::bind_named_params(&names, params)?;
self.query(&rewritten, &positional).await
}
fn last_warnings(&self) -> Vec<SqlWarning> {
Vec::new()
}
fn query_stream<'a>(
&'a self,
sql: &'a str,
params: &'a [&'a dyn ToSqlValue],
) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
use futures::StreamExt;
let fut = self.query(sql, params);
Box::pin(futures::stream::once(fut).flat_map(|result| match result {
Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
}))
}
}
#[async_trait]
pub trait Transaction: Send + Sync {
async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError>;
async fn query(
&mut self,
sql: &str,
params: &[&dyn ToSqlValue],
) -> Result<Vec<Row>, OxiSqlError>;
async fn commit(self: Box<Self>) -> Result<(), OxiSqlError>;
async fn rollback(self: Box<Self>) -> Result<(), OxiSqlError>;
async fn savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
let _ = name;
Err(OxiSqlError::Other(
"savepoints are not supported by this backend".into(),
))
}
async fn release_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
let _ = name;
Err(OxiSqlError::Other(
"savepoints are not supported by this backend".into(),
))
}
async fn rollback_to_savepoint(&mut self, name: &str) -> Result<(), OxiSqlError> {
let _ = name;
Err(OxiSqlError::Other(
"savepoints are not supported by this backend".into(),
))
}
fn query_stream<'a>(
&'a mut self,
sql: &'a str,
params: &'a [&'a dyn ToSqlValue],
) -> Pin<Box<dyn Stream<Item = Result<Row, OxiSqlError>> + Send + 'a>> {
use futures::StreamExt;
let fut = self.query(sql, params);
Box::pin(futures::stream::once(fut).flat_map(|result| match result {
Ok(rows) => futures::stream::iter(rows.into_iter().map(Ok)).left_stream(),
Err(e) => futures::stream::once(async move { Err(e) }).right_stream(),
}))
}
}