reifydb_engine/
session.rs1use std::{thread, time::Duration};
11
12use reifydb_core::{execution::ExecutionResult, interface::catalog::token::Token};
13use reifydb_type::{params::Params, value::identity::IdentityId};
14use tracing::{instrument, warn};
15
16use crate::engine::StandardEngine;
17
18pub enum Backoff {
20 None,
22 Fixed(Duration),
24 Exponential {
26 base: Duration,
27 max: Duration,
28 },
29}
30
31pub struct RetryStrategy {
33 pub max_attempts: u32,
34 pub backoff: Backoff,
35}
36
37impl Default for RetryStrategy {
38 fn default() -> Self {
39 Self {
40 max_attempts: 3,
41 backoff: Backoff::None,
42 }
43 }
44}
45
46impl RetryStrategy {
47 pub fn no_retry() -> Self {
49 Self {
50 max_attempts: 1,
51 backoff: Backoff::None,
52 }
53 }
54
55 pub fn default_conflict_retry() -> Self {
57 Self::default()
58 }
59
60 pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
62 Self {
63 max_attempts,
64 backoff: Backoff::Fixed(delay),
65 }
66 }
67
68 pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
70 Self {
71 max_attempts,
72 backoff: Backoff::Exponential {
73 base,
74 max,
75 },
76 }
77 }
78
79 pub fn execute<F>(&self, _rql: &str, mut f: F) -> ExecutionResult
80 where
81 F: FnMut() -> ExecutionResult,
82 {
83 let mut last_result = None;
84 for attempt in 0..self.max_attempts {
85 let result = f();
86 match &result.error {
87 None => return result,
88 Some(err) if err.code == "TXN_001" => {
89 warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
90 last_result = Some(result);
91 if attempt + 1 < self.max_attempts {
92 match &self.backoff {
93 Backoff::None => {}
94 Backoff::Fixed(d) => thread::sleep(*d),
95 Backoff::Exponential {
96 base,
97 max,
98 } => {
99 let delay = (*base) * 2u32.saturating_pow(attempt);
100 thread::sleep(delay.min(*max));
101 }
102 }
103 }
104 }
105 Some(_) => {
106 return result;
107 }
108 }
109 }
110 last_result.unwrap()
111 }
112}
113
114pub struct Session {
116 engine: StandardEngine,
117 identity: IdentityId,
118 authenticated: bool,
119 token: Option<String>,
120 retry: RetryStrategy,
121}
122
123impl Session {
124 pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
126 Self {
127 engine,
128 identity: info.identity,
129 authenticated: true,
130 token: None,
131 retry: RetryStrategy::default(),
132 }
133 }
134
135 pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
137 Self {
138 engine,
139 identity: info.identity,
140 authenticated: true,
141 token: Some(info.token.clone()),
142 retry: RetryStrategy::default(),
143 }
144 }
145
146 pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
148 Self {
149 engine,
150 identity,
151 authenticated: false,
152 token: None,
153 retry: RetryStrategy::default(),
154 }
155 }
156
157 pub fn anonymous(engine: StandardEngine) -> Self {
159 Self::trusted(engine, IdentityId::anonymous())
160 }
161
162 pub fn with_retry(mut self, strategy: RetryStrategy) -> Self {
164 self.retry = strategy;
165 self
166 }
167
168 #[inline]
170 pub fn identity(&self) -> IdentityId {
171 self.identity
172 }
173
174 #[inline]
176 pub fn token(&self) -> Option<&str> {
177 self.token.as_deref()
178 }
179
180 #[inline]
182 pub fn is_authenticated(&self) -> bool {
183 self.authenticated
184 }
185
186 #[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
188 pub fn query(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
189 self.engine.query_as(self.identity, rql, params.into())
190 }
191
192 #[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
194 pub fn command(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
195 let params = params.into();
196 self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
197 }
198
199 #[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
201 pub fn admin(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
202 let params = params.into();
203 self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
204 }
205}