Skip to main content

reifydb_engine/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Unified session type for database access.
5//!
6//! A `Session` binds an identity to an engine and provides query, command, and
7//! admin methods. Sessions are created either from a validated auth token
8//! (server path) or directly from an `IdentityId` (embedded/trusted path).
9
10use std::{thread, time::Duration};
11
12use reifydb_core::interface::catalog::token::Token;
13use reifydb_type::{
14	error::Error,
15	params::Params,
16	value::{frame::frame::Frame, identity::IdentityId},
17};
18use tracing::{instrument, warn};
19
20use crate::engine::StandardEngine;
21
22/// Backoff strategy between retry attempts.
23pub enum Backoff {
24	/// No delay between retries.
25	None,
26	/// Fixed delay between each retry attempt.
27	Fixed(Duration),
28	/// Exponential backoff: delay doubles each attempt, capped at `max`.
29	Exponential {
30		base: Duration,
31		max: Duration,
32	},
33}
34
35/// Controls how many times a write transaction is retried on conflict (`TXN_001`).
36pub struct RetryPolicy {
37	pub max_attempts: u32,
38	pub backoff: Backoff,
39}
40
41impl Default for RetryPolicy {
42	fn default() -> Self {
43		Self {
44			max_attempts: 3,
45			backoff: Backoff::None,
46		}
47	}
48}
49
50impl RetryPolicy {
51	/// No retries — fail immediately on conflict.
52	pub fn no_retry() -> Self {
53		Self {
54			max_attempts: 1,
55			backoff: Backoff::None,
56		}
57	}
58
59	/// Default conflict retry: 3 attempts, no backoff (matches legacy engine behavior).
60	pub fn default_conflict_retry() -> Self {
61		Self::default()
62	}
63
64	/// Fixed delay between retry attempts.
65	pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
66		Self {
67			max_attempts,
68			backoff: Backoff::Fixed(delay),
69		}
70	}
71
72	/// Exponential backoff: delay doubles each attempt, capped at `max`.
73	pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
74		Self {
75			max_attempts,
76			backoff: Backoff::Exponential {
77				base,
78				max,
79			},
80		}
81	}
82
83	pub fn execute<F>(&self, rql: &str, mut f: F) -> Result<Vec<Frame>, Error>
84	where
85		F: FnMut() -> Result<Vec<Frame>, Error>,
86	{
87		let mut last_err = None;
88		for attempt in 0..self.max_attempts {
89			match f() {
90				Ok(frames) => return Ok(frames),
91				Err(err) if err.code == "TXN_001" => {
92					warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
93					last_err = Some(err);
94					if attempt + 1 < self.max_attempts {
95						match &self.backoff {
96							Backoff::None => {}
97							Backoff::Fixed(d) => thread::sleep(*d),
98							Backoff::Exponential {
99								base,
100								max,
101							} => {
102								let delay = (*base) * 2u32.saturating_pow(attempt);
103								thread::sleep(delay.min(*max));
104							}
105						}
106					}
107				}
108				Err(mut err) => {
109					err.with_statement(rql.to_string());
110					return Err(err);
111				}
112			}
113		}
114		let mut err = last_err.unwrap();
115		err.with_statement(rql.to_string());
116		Err(err)
117	}
118}
119
120/// A unified session binding an identity to a database engine.
121pub struct Session {
122	engine: StandardEngine,
123	identity: IdentityId,
124	authenticated: bool,
125	token: Option<String>,
126	retry: RetryPolicy,
127}
128
129impl Session {
130	/// Create a session from a validated auth token (server path).
131	pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
132		Self {
133			engine,
134			identity: info.identity,
135			authenticated: true,
136			token: None,
137			retry: RetryPolicy::default(),
138		}
139	}
140
141	/// Create a session from a validated auth token, preserving the token string.
142	pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
143		Self {
144			engine,
145			identity: info.identity,
146			authenticated: true,
147			token: Some(info.token.clone()),
148			retry: RetryPolicy::default(),
149		}
150	}
151
152	/// Create a trusted session (embedded path, no authentication required).
153	pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
154		Self {
155			engine,
156			identity,
157			authenticated: false,
158			token: None,
159			retry: RetryPolicy::default(),
160		}
161	}
162
163	/// Create an anonymous session.
164	pub fn anonymous(engine: StandardEngine) -> Self {
165		Self::trusted(engine, IdentityId::anonymous())
166	}
167
168	/// Set the retry policy for command and admin operations.
169	pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
170		self.retry = policy;
171		self
172	}
173
174	/// The identity associated with this session.
175	#[inline]
176	pub fn identity(&self) -> IdentityId {
177		self.identity
178	}
179
180	/// The auth token, if this session was created from a validated token.
181	#[inline]
182	pub fn token(&self) -> Option<&str> {
183		self.token.as_deref()
184	}
185
186	/// Whether this session was created from authenticated credentials.
187	#[inline]
188	pub fn is_authenticated(&self) -> bool {
189		self.authenticated
190	}
191
192	/// Execute a read-only query.
193	#[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
194	pub fn query(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
195		self.engine.query_as(self.identity, rql, params.into())
196	}
197
198	/// Execute a transactional command (DML + Query) with retry on conflict.
199	#[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
200	pub fn command(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
201		let params = params.into();
202		self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
203	}
204
205	/// Execute an admin (DDL + DML + Query) operation with retry on conflict.
206	#[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
207	pub fn admin(&self, rql: &str, params: impl Into<Params>) -> Result<Vec<Frame>, Error> {
208		let params = params.into();
209		self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
210	}
211}