use std::{thread, time::Duration};
use reifydb_core::interface::catalog::token::Token;
use reifydb_type::{
error::Error,
params::Params,
value::{frame::frame::Frame, identity::IdentityId},
};
use tracing::{instrument, warn};
use crate::engine::StandardEngine;
pub enum Backoff {
None,
Fixed(Duration),
Exponential {
base: Duration,
max: Duration,
},
}
pub struct RetryStrategy {
pub max_attempts: u32,
pub backoff: Backoff,
}
impl Default for RetryStrategy {
fn default() -> Self {
Self {
max_attempts: 3,
backoff: Backoff::None,
}
}
}
impl RetryStrategy {
pub fn no_retry() -> Self {
Self {
max_attempts: 1,
backoff: Backoff::None,
}
}
pub fn default_conflict_retry() -> Self {
Self::default()
}
pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
Self {
max_attempts,
backoff: Backoff::Fixed(delay),
}
}
pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
Self {
max_attempts,
backoff: Backoff::Exponential {
base,
max,
},
}
}
pub fn execute<F>(&self, rql: &str, mut f: F) -> Result<Vec<Frame>, Error>
where
F: FnMut() -> Result<Vec<Frame>, Error>,
{
let mut last_err = None;
for attempt in 0..self.max_attempts {
match f() {
Ok(frames) => return Ok(frames),
Err(err) if err.code == "TXN_001" => {
warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
last_err = Some(err);
if attempt + 1 < self.max_attempts {
match &self.backoff {
Backoff::None => {}
Backoff::Fixed(d) => thread::sleep(*d),
Backoff::Exponential {
base,
max,
} => {
let delay = (*base) * 2u32.saturating_pow(attempt);
thread::sleep(delay.min(*max));
}
}
}
}
Err(mut err) => {
err.with_statement(rql.to_string());
return Err(err);
}
}
}
let mut err = last_err.unwrap();
err.with_statement(rql.to_string());
Err(err)
}
}
pub struct Session {
engine: StandardEngine,
identity: IdentityId,
authenticated: bool,
token: Option<String>,
retry: RetryStrategy,
}
impl Session {
pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
Self {
engine,
identity: info.identity,
authenticated: true,
token: None,
retry: RetryStrategy::default(),
}
}
pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
Self {
engine,
identity: info.identity,
authenticated: true,
token: Some(info.token.clone()),
retry: RetryStrategy::default(),
}
}
pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
Self {
engine,
identity,
authenticated: false,
token: None,
retry: RetryStrategy::default(),
}
}
pub fn anonymous(engine: StandardEngine) -> Self {
Self::trusted(engine, IdentityId::anonymous())
}
pub fn with_retry(mut self, strategy: RetryStrategy) -> Self {
self.retry = strategy;
self
}
#[inline]
pub fn identity(&self) -> IdentityId {
self.identity
}
#[inline]
pub fn token(&self) -> Option<&str> {
self.token.as_deref()
}
#[inline]
pub fn is_authenticated(&self) -> bool {
self.authenticated
}
#[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn query(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
self.engine.query_as(self.identity, rql, params.into())
}
#[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn command(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
let params = params.into();
self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
}
#[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn admin(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
let params = params.into();
self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
}
}