use anyhow::Result;
use crate::{proto, BatchResult, ResultSet, Statement, Transaction};
static TRANSACTION_IDS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
#[derive(Debug)]
pub enum Client {
#[cfg(feature = "local_backend")]
Local(crate::local::Client),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Http(crate::http::Client),
#[cfg(feature = "hrana_backend")]
Hrana(crate::hrana::Client),
}
unsafe impl Send for Client {}
impl Client {
pub async fn raw_batch(
&self,
stmts: impl IntoIterator<Item = impl Into<Statement> + Send> + Send,
) -> Result<BatchResult> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.raw_batch(stmts),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Self::Http(r) => r.raw_batch(stmts).await,
#[cfg(feature = "hrana_backend")]
Self::Hrana(h) => h.raw_batch(stmts).await,
}
}
pub async fn batch<I: IntoIterator<Item = impl Into<Statement> + Send> + Send>(
&self,
stmts: I,
) -> Result<Vec<ResultSet>>
where
<I as IntoIterator>::IntoIter: Send,
{
let batch_results = self
.raw_batch(
std::iter::once(Statement::new("BEGIN"))
.chain(stmts.into_iter().map(|s| s.into()))
.chain(std::iter::once(Statement::new("END"))),
)
.await?;
let step_error: Option<proto::Error> = batch_results
.step_errors
.into_iter()
.skip(1)
.find(|e| e.is_some())
.flatten();
if let Some(error) = step_error {
return Err(anyhow::anyhow!(error.message));
}
let mut step_results: Vec<Result<ResultSet>> = batch_results
.step_results
.into_iter()
.skip(1) .map(|maybe_rs| {
maybe_rs
.map(ResultSet::from)
.ok_or_else(|| anyhow::anyhow!("Unexpected missing result set"))
})
.collect();
step_results.pop(); step_results.into_iter().collect::<Result<Vec<ResultSet>>>()
}
pub fn batch_sync<I: IntoIterator<Item = impl Into<Statement> + Send> + Send>(
&self,
stmts: I,
) -> Result<Vec<ResultSet>>
where
<I as std::iter::IntoIterator>::IntoIter: std::marker::Send,
{
futures::executor::block_on(self.batch(stmts))
}
pub async fn execute(&self, stmt: impl Into<Statement> + Send) -> Result<ResultSet> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.execute(stmt),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Self::Http(r) => r.execute(stmt).await,
#[cfg(feature = "hrana_backend")]
Self::Hrana(h) => h.execute(stmt).await,
}
}
pub fn execute_sync(&self, stmt: impl Into<Statement> + Send) -> Result<ResultSet> {
futures::executor::block_on(self.execute(stmt))
}
pub async fn transaction(&self) -> Result<Transaction> {
let id = TRANSACTION_IDS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Transaction::new(self, id).await
}
pub async fn execute_in_transaction(&self, tx_id: u64, stmt: Statement) -> Result<ResultSet> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.execute_in_transaction(tx_id, stmt),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Self::Http(r) => r.execute_in_transaction(tx_id, stmt).await,
#[cfg(feature = "hrana_backend")]
Self::Hrana(h) => h.execute_in_transaction(tx_id, stmt).await,
}
}
pub fn execute_in_transaction_sync(&self, tx_id: u64, stmt: Statement) -> Result<ResultSet> {
futures::executor::block_on(self.execute_in_transaction(tx_id, stmt))
}
pub async fn commit_transaction(&self, tx_id: u64) -> Result<()> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.commit_transaction(tx_id),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Self::Http(r) => r.commit_transaction(tx_id).await,
#[cfg(feature = "hrana_backend")]
Self::Hrana(h) => h.commit_transaction(tx_id).await,
}
}
pub fn commit_transaction_sync(&self, tx_id: u64) -> Result<()> {
futures::executor::block_on(self.commit_transaction(tx_id))
}
pub async fn rollback_transaction(&self, tx_id: u64) -> Result<()> {
match self {
#[cfg(feature = "local_backend")]
Self::Local(l) => l.rollback_transaction(tx_id),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Self::Http(r) => r.rollback_transaction(tx_id).await,
#[cfg(feature = "hrana_backend")]
Self::Hrana(h) => h.rollback_transaction(tx_id).await,
}
}
pub fn rollback_transaction_sync(&self, tx_id: u64) -> Result<()> {
futures::executor::block_on(self.rollback_transaction(tx_id))
}
}
impl Client {
#[allow(unreachable_patterns)]
pub async fn from_config<'a>(config: Config) -> anyhow::Result<Client> {
let scheme = config.url.scheme();
Ok(match scheme {
#[cfg(feature = "local_backend")]
"file" => {
Client::Local(crate::local::Client::new(config.url.to_string())?)
},
#[cfg(feature = "hrana_backend")]
"ws" | "wss" => {
Client::Hrana(crate::hrana::Client::from_config(config).await?)
},
#[cfg(feature = "reqwest_backend")]
"libsql" => {
let inner = crate::http::InnerClient::Reqwest(crate::reqwest::HttpClient::new());
let mut config = config;
config.url = if config.url.scheme() == "libsql" {
url::Url::parse(&config.url.as_str().replace("libsql://", "https://")).unwrap()
} else {
config.url
};
Client::Http(crate::http::Client::from_config(inner, config)?)
}
#[cfg(feature = "reqwest_backend")]
"http" | "https" => {
let inner = crate::http::InnerClient::Reqwest(crate::reqwest::HttpClient::new());
Client::Http(crate::http::Client::from_config(inner, config)?)
},
#[cfg(feature = "workers_backend")]
"workers" | "http" | "https" => {
let inner = crate::http::InnerClient::Workers(crate::workers::HttpClient::new());
Client::Http(crate::http::Client::from_config(inner, config)?)
},
#[cfg(feature = "spin_backend")]
"spin" | "http" | "https" => {
let inner = crate::http::InnerClient::Spin(crate::spin::HttpClient::new());
Client::Http(crate::http::Client::from_config(inner, config)?)
},
_ => anyhow::bail!("Unknown scheme: {scheme}. Make sure your backend exists and is enabled with its feature flag"),
})
}
pub fn from_config_sync(config: Config) -> anyhow::Result<Client> {
futures::executor::block_on(Self::from_config(config))
}
pub async fn from_env() -> anyhow::Result<Client> {
let url = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| {
anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your libSQL/sqld database")
})?;
let auth_token = std::env::var("LIBSQL_CLIENT_TOKEN").ok();
Self::from_config(Config {
url: url::Url::parse(&url)?,
auth_token,
})
.await
}
pub fn from_env_sync() -> anyhow::Result<Client> {
futures::executor::block_on(Self::from_env())
}
#[cfg(feature = "workers_backend")]
pub fn from_workers_env(env: &worker::Env) -> anyhow::Result<Client> {
let url = env
.secret("LIBSQL_CLIENT_URL")
.map_err(|e| anyhow::anyhow!("{e}"))?
.to_string();
let token = env
.secret("LIBSQL_CLIENT_TOKEN")
.map_err(|e| anyhow::anyhow!("{e}"))?
.to_string();
let config = Config {
url: url::Url::parse(&url)?,
auth_token: Some(token),
};
let inner = crate::http::InnerClient::Workers(crate::workers::HttpClient::new());
Ok(Client::Http(crate::http::Client::from_config(
inner, config,
)?))
}
}
pub struct Config {
pub url: url::Url,
pub auth_token: Option<String>,
}