mod rows;
use crate::hrana::transaction::{HttpTransaction, TxScopeCounter};
use crate::hrana::unwrap_err;
use crate::{
hrana::{connection::HttpConnection, HttpSend},
params::IntoParams,
TransactionBehavior,
};
use libsql_hrana::proto::{Batch, Stmt};
pub use crate::wasm::rows::Rows;
cfg_cloudflare! {
mod cloudflare;
pub use cloudflare::CloudflareSender;
}
#[derive(Debug, Clone)]
pub struct Connection<T>
where
T: HttpSend,
{
conn: HttpConnection<T>,
}
cfg_cloudflare! {
impl Connection<CloudflareSender> {
pub fn open_cloudflare_worker(url: impl Into<String>, auth_token: impl Into<String>) -> Self {
Connection {
conn: HttpConnection::new(url.into(), auth_token.into(), CloudflareSender::new()),
}
}
}
}
impl<T> Connection<T>
where
T: HttpSend,
<T as HttpSend>::Stream: 'static,
{
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
tracing::trace!("executing `{}`", sql);
let mut stmt = crate::hrana::Statement::new(
self.conn.current_stream().clone(),
sql.to_string(),
true,
)?;
let rows = stmt.execute(¶ms.into_params()?).await?;
Ok(rows as u64)
}
pub async fn execute_batch(&self, sql: &str) -> crate::Result<()> {
let mut stmts = Vec::new();
let parse = crate::parser::Statement::parse(sql);
let mut c = TxScopeCounter::default();
for s in parse {
let s = s?;
c.count(s.kind);
stmts.push(Stmt::new(s.stmt, false));
}
let stream = self.conn.current_stream();
let in_tx_scope = !stream.is_autocommit() || c.begin_tx();
let close = !in_tx_scope || c.end_tx();
let res = stream
.batch_inner(Batch::from_iter(stmts), close)
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
unwrap_err(&res)
}
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
tracing::trace!("querying `{}`", sql);
let mut stmt = crate::hrana::Statement::new(
self.conn.current_stream().clone(),
sql.to_string(),
true,
)?;
let rows = stmt.query_raw(¶ms.into_params()?).await?;
Ok(Rows {
inner: Box::new(rows),
})
}
pub async fn transaction(
&self,
tx_behavior: TransactionBehavior,
) -> crate::Result<Transaction<T>> {
let stream = self.conn.open_stream();
let tx = HttpTransaction::open(stream, tx_behavior)
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
Ok(Transaction { inner: tx })
}
}
#[derive(Debug, Clone)]
pub struct Transaction<T>
where
T: HttpSend,
{
inner: HttpTransaction<T>,
}
impl<T> Transaction<T>
where
T: HttpSend,
<T as HttpSend>::Stream: 'static,
{
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
tracing::trace!("querying `{}`", sql);
let stream = self.inner.stream().clone();
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let rows = stmt.query_raw(¶ms.into_params()?).await?;
Ok(Rows {
inner: Box::new(rows),
})
}
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
tracing::trace!("executing `{}`", sql);
let stream = self.inner.stream().clone();
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let rows = stmt.execute(¶ms.into_params()?).await?;
Ok(rows as u64)
}
pub async fn execute_batch(&self, sql: &str) -> crate::Result<()> {
let mut statements = Vec::new();
let stmts = crate::parser::Statement::parse(sql);
let mut c = TxScopeCounter::default();
for s in stmts {
let s = s?;
c.count(s.kind);
statements.push(crate::hrana::proto::Stmt::new(s.stmt, false));
}
let stream = self.inner.stream();
let in_tx_scope = !stream.is_autocommit() || c.begin_tx();
let close = !in_tx_scope || c.end_tx();
stream
.batch_inner(Batch::from_iter(statements), close)
.await
.map_err(|e| crate::Error::Hrana(e.into()))?;
Ok(())
}
pub async fn commit(&mut self) -> crate::Result<()> {
self.inner
.commit()
.await
.map_err(|e| crate::Error::Hrana(e.into()))
}
pub async fn rollback(&mut self) -> crate::Result<()> {
self.inner
.rollback()
.await
.map_err(|e| crate::Error::Hrana(e.into()))
}
}