use std::{thread, time::Duration};
use reifydb_core::{execution::ExecutionResult, interface::catalog::token::Token};
use reifydb_type::{params::Params, value::identity::IdentityId};
use tracing::{instrument, warn};
use crate::engine::StandardEngine;
pub enum Backoff {
None,
Fixed(Duration),
Exponential {
base: Duration,
max: Duration,
},
}
pub struct RetryStrategy {
pub max_attempts: u32,
pub backoff: Backoff,
}
impl Default for RetryStrategy {
fn default() -> Self {
Self {
max_attempts: 3,
backoff: Backoff::None,
}
}
}
impl RetryStrategy {
pub fn no_retry() -> Self {
Self {
max_attempts: 1,
backoff: Backoff::None,
}
}
pub fn default_conflict_retry() -> Self {
Self::default()
}
pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
Self {
max_attempts,
backoff: Backoff::Fixed(delay),
}
}
pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
Self {
max_attempts,
backoff: Backoff::Exponential {
base,
max,
},
}
}
pub fn execute<F>(&self, _rql: &str, mut f: F) -> ExecutionResult
where
F: FnMut() -> ExecutionResult,
{
let mut last_result = None;
for attempt in 0..self.max_attempts {
let result = f();
match &result.error {
None => return result,
Some(err) if err.code == "TXN_001" => {
warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
last_result = Some(result);
if attempt + 1 < self.max_attempts {
match &self.backoff {
Backoff::None => {}
Backoff::Fixed(d) => thread::sleep(*d),
Backoff::Exponential {
base,
max,
} => {
let delay = (*base) * 2u32.saturating_pow(attempt);
thread::sleep(delay.min(*max));
}
}
}
}
Some(_) => {
return result;
}
}
}
last_result.unwrap()
}
}
pub struct Session {
engine: StandardEngine,
identity: IdentityId,
authenticated: bool,
token: Option<String>,
retry: RetryStrategy,
}
impl Session {
pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
Self {
engine,
identity: info.identity,
authenticated: true,
token: None,
retry: RetryStrategy::default(),
}
}
pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
Self {
engine,
identity: info.identity,
authenticated: true,
token: Some(info.token.clone()),
retry: RetryStrategy::default(),
}
}
pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
Self {
engine,
identity,
authenticated: false,
token: None,
retry: RetryStrategy::default(),
}
}
pub fn anonymous(engine: StandardEngine) -> Self {
Self::trusted(engine, IdentityId::anonymous())
}
pub fn with_retry(mut self, strategy: RetryStrategy) -> Self {
self.retry = strategy;
self
}
#[inline]
pub fn identity(&self) -> IdentityId {
self.identity
}
#[inline]
pub fn token(&self) -> Option<&str> {
self.token.as_deref()
}
#[inline]
pub fn is_authenticated(&self) -> bool {
self.authenticated
}
#[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn query(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
self.engine.query_as(self.identity, rql, params.into())
}
#[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn command(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
let params = params.into();
self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
}
#[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
pub fn admin(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
let params = params.into();
self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
}
}