1use std::{thread, time::Duration};
11
12use reifydb_core::interface::catalog::token::Token;
13use reifydb_type::{
14 error::Error,
15 params::Params,
16 value::{frame::frame::Frame, identity::IdentityId},
17};
18use tracing::{instrument, warn};
19
20use crate::engine::StandardEngine;
21
22pub enum Backoff {
24 None,
26 Fixed(Duration),
28 Exponential {
30 base: Duration,
31 max: Duration,
32 },
33}
34
35pub struct RetryPolicy {
37 pub max_attempts: u32,
38 pub backoff: Backoff,
39}
40
41impl Default for RetryPolicy {
42 fn default() -> Self {
43 Self {
44 max_attempts: 3,
45 backoff: Backoff::None,
46 }
47 }
48}
49
50impl RetryPolicy {
51 pub fn no_retry() -> Self {
53 Self {
54 max_attempts: 1,
55 backoff: Backoff::None,
56 }
57 }
58
59 pub fn default_conflict_retry() -> Self {
61 Self::default()
62 }
63
64 pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
66 Self {
67 max_attempts,
68 backoff: Backoff::Fixed(delay),
69 }
70 }
71
72 pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
74 Self {
75 max_attempts,
76 backoff: Backoff::Exponential {
77 base,
78 max,
79 },
80 }
81 }
82
83 pub fn execute<F>(&self, rql: &str, mut f: F) -> Result<Vec<Frame>, Error>
84 where
85 F: FnMut() -> Result<Vec<Frame>, Error>,
86 {
87 let mut last_err = None;
88 for attempt in 0..self.max_attempts {
89 match f() {
90 Ok(frames) => return Ok(frames),
91 Err(err) if err.code == "TXN_001" => {
92 warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
93 last_err = Some(err);
94 if attempt + 1 < self.max_attempts {
95 match &self.backoff {
96 Backoff::None => {}
97 Backoff::Fixed(d) => thread::sleep(*d),
98 Backoff::Exponential {
99 base,
100 max,
101 } => {
102 let delay = (*base) * 2u32.saturating_pow(attempt);
103 thread::sleep(delay.min(*max));
104 }
105 }
106 }
107 }
108 Err(mut err) => {
109 err.with_statement(rql.to_string());
110 return Err(err);
111 }
112 }
113 }
114 let mut err = last_err.unwrap();
115 err.with_statement(rql.to_string());
116 Err(err)
117 }
118}
119
120pub struct Session {
122 engine: StandardEngine,
123 identity: IdentityId,
124 authenticated: bool,
125 token: Option<String>,
126 retry: RetryPolicy,
127}
128
129impl Session {
130 pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
132 Self {
133 engine,
134 identity: info.identity,
135 authenticated: true,
136 token: None,
137 retry: RetryPolicy::default(),
138 }
139 }
140
141 pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
143 Self {
144 engine,
145 identity: info.identity,
146 authenticated: true,
147 token: Some(info.token.clone()),
148 retry: RetryPolicy::default(),
149 }
150 }
151
152 pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
154 Self {
155 engine,
156 identity,
157 authenticated: false,
158 token: None,
159 retry: RetryPolicy::default(),
160 }
161 }
162
163 pub fn anonymous(engine: StandardEngine) -> Self {
165 Self::trusted(engine, IdentityId::anonymous())
166 }
167
168 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
170 self.retry = policy;
171 self
172 }
173
174 #[inline]
176 pub fn identity(&self) -> IdentityId {
177 self.identity
178 }
179
180 #[inline]
182 pub fn token(&self) -> Option<&str> {
183 self.token.as_deref()
184 }
185
186 #[inline]
188 pub fn is_authenticated(&self) -> bool {
189 self.authenticated
190 }
191
192 #[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
194 pub fn query(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
195 self.engine.query_as(self.identity, rql, params.into())
196 }
197
198 #[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
200 pub fn command(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
201 let params = params.into();
202 self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
203 }
204
205 #[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
207 pub fn admin(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
208 let params = params.into();
209 self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
210 }
211}