use std::{fmt, str::FromStr};
use async_trait::async_trait;
#[cfg(feature = "metrics")]
use prisma_metrics::guards::GaugeGuard;
#[cfg(not(feature = "metrics"))]
#[derive(Default)]
pub struct GaugeGuard;
#[cfg(not(feature = "metrics"))]
impl GaugeGuard {
pub fn increment(_: &str) -> Self {
Self
}
pub fn decrement(&self) {}
}
use super::*;
use crate::{
ast::*,
error::{Error, ErrorKind},
};
#[async_trait]
pub trait Transaction: Queryable {
async fn commit(&self) -> crate::Result<()>;
async fn rollback(&self) -> crate::Result<()>;
fn as_queryable(&self) -> &dyn Queryable;
}
#[cfg(any(
feature = "sqlite-native",
feature = "mssql-native",
feature = "postgresql-native",
feature = "mysql-native"
))]
pub(crate) struct TransactionOptions {
pub(crate) isolation_level: Option<IsolationLevel>,
pub(crate) isolation_first: bool,
}
#[cfg(any(
feature = "sqlite-native",
feature = "mssql-native",
feature = "postgresql-native",
feature = "mysql-native"
))]
impl TransactionOptions {
pub fn new(isolation_level: Option<IsolationLevel>, isolation_first: bool) -> Self {
Self {
isolation_level,
isolation_first,
}
}
}
pub struct DefaultTransaction<'a> {
pub inner: &'a dyn Queryable,
gauge: GaugeGuard,
}
#[cfg_attr(
not(any(
feature = "sqlite-native",
feature = "mssql-native",
feature = "postgresql-native",
feature = "mysql-native"
)),
allow(clippy::needless_lifetimes)
)]
impl<'a> DefaultTransaction<'a> {
#[cfg(any(
feature = "sqlite-native",
feature = "mssql-native",
feature = "postgresql-native",
feature = "mysql-native"
))]
pub(crate) async fn new(
inner: &'a dyn Queryable,
begin_stmt: &str,
tx_opts: TransactionOptions,
) -> crate::Result<DefaultTransaction<'a>> {
let this = Self {
inner,
gauge: GaugeGuard::increment("prisma_client_queries_active"),
};
if tx_opts.isolation_first
&& let Some(isolation) = tx_opts.isolation_level
{
inner.set_tx_isolation_level(isolation).await?;
}
inner.raw_cmd(begin_stmt).await?;
if !tx_opts.isolation_first
&& let Some(isolation) = tx_opts.isolation_level
{
inner.set_tx_isolation_level(isolation).await?;
}
inner.server_reset_query(&this).await?;
Ok(this)
}
}
#[async_trait]
impl Transaction for DefaultTransaction<'_> {
async fn commit(&self) -> crate::Result<()> {
self.gauge.decrement();
self.inner.raw_cmd("COMMIT").await?;
Ok(())
}
async fn rollback(&self) -> crate::Result<()> {
self.gauge.decrement();
self.inner.raw_cmd("ROLLBACK").await?;
Ok(())
}
fn as_queryable(&self) -> &dyn Queryable {
self
}
}
#[async_trait]
impl Queryable for DefaultTransaction<'_> {
async fn query(&self, q: Query<'_>) -> crate::Result<ResultSet> {
self.inner.query(q).await
}
async fn execute(&self, q: Query<'_>) -> crate::Result<u64> {
self.inner.execute(q).await
}
async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.inner.query_raw(sql, params).await
}
async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<ResultSet> {
self.inner.query_raw_typed(sql, params).await
}
async fn describe_query(&self, sql: &str) -> crate::Result<DescribedQuery> {
self.inner.describe_query(sql).await
}
async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.inner.execute_raw(sql, params).await
}
async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result<u64> {
self.inner.execute_raw_typed(sql, params).await
}
async fn raw_cmd(&self, cmd: &str) -> crate::Result<()> {
self.inner.raw_cmd(cmd).await
}
async fn version(&self) -> crate::Result<Option<String>> {
self.inner.version().await
}
fn is_healthy(&self) -> bool {
self.inner.is_healthy()
}
async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> crate::Result<()> {
self.inner.set_tx_isolation_level(isolation_level).await
}
fn requires_isolation_first(&self) -> bool {
self.inner.requires_isolation_first()
}
}
#[derive(Debug, Clone, Copy)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Snapshot,
Serializable,
}
impl fmt::Display for IsolationLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ReadUncommitted => write!(f, "READ UNCOMMITTED"),
Self::ReadCommitted => write!(f, "READ COMMITTED"),
Self::RepeatableRead => write!(f, "REPEATABLE READ"),
Self::Snapshot => write!(f, "SNAPSHOT"),
Self::Serializable => write!(f, "SERIALIZABLE"),
}
}
}
impl FromStr for IsolationLevel {
type Err = Error;
fn from_str(s: &str) -> crate::Result<Self> {
match s.to_lowercase().as_str() {
"read uncommitted" | "readuncommitted" => Ok(Self::ReadUncommitted),
"read committed" | "readcommitted" => Ok(Self::ReadCommitted),
"repeatable read" | "repeatableread" => Ok(Self::RepeatableRead),
"snapshot" => Ok(Self::Snapshot),
"serializable" => Ok(Self::Serializable),
_ => {
let kind = ErrorKind::conversion(format!("Invalid isolation level `{s}`"));
Err(Error::builder(kind).build())
}
}
}
}