Skip to main content

reifydb_engine/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Unified session type for database access.
5//!
6//! A `Session` binds an identity to an engine and provides query, command, and
7//! admin methods. Sessions are created either from a validated auth token
8//! (server path) or directly from an `IdentityId` (embedded/trusted path).
9
10use std::{thread, time::Duration};
11
12use reifydb_core::{execution::ExecutionResult, interface::catalog::token::Token};
13use reifydb_type::{params::Params, value::identity::IdentityId};
14use tracing::{instrument, warn};
15
16use crate::engine::StandardEngine;
17
18/// Backoff strategy between retry attempts.
19pub enum Backoff {
20	/// No delay between retries.
21	None,
22	/// Fixed delay between each retry attempt.
23	Fixed(Duration),
24	/// Exponential backoff: delay doubles each attempt, capped at `max`.
25	Exponential {
26		base: Duration,
27		max: Duration,
28	},
29}
30
31/// Controls how many times a write transaction is retried on conflict (`TXN_001`).
32pub struct RetryStrategy {
33	pub max_attempts: u32,
34	pub backoff: Backoff,
35}
36
37impl Default for RetryStrategy {
38	fn default() -> Self {
39		Self {
40			max_attempts: 3,
41			backoff: Backoff::None,
42		}
43	}
44}
45
46impl RetryStrategy {
47	/// No retries — fail immediately on conflict.
48	pub fn no_retry() -> Self {
49		Self {
50			max_attempts: 1,
51			backoff: Backoff::None,
52		}
53	}
54
55	/// Default conflict retry: 3 attempts, no backoff (matches legacy engine behavior).
56	pub fn default_conflict_retry() -> Self {
57		Self::default()
58	}
59
60	/// Fixed delay between retry attempts.
61	pub fn with_fixed_backoff(max_attempts: u32, delay: Duration) -> Self {
62		Self {
63			max_attempts,
64			backoff: Backoff::Fixed(delay),
65		}
66	}
67
68	/// Exponential backoff: delay doubles each attempt, capped at `max`.
69	pub fn with_exponential_backoff(max_attempts: u32, base: Duration, max: Duration) -> Self {
70		Self {
71			max_attempts,
72			backoff: Backoff::Exponential {
73				base,
74				max,
75			},
76		}
77	}
78
79	pub fn execute<F>(&self, _rql: &str, mut f: F) -> ExecutionResult
80	where
81		F: FnMut() -> ExecutionResult,
82	{
83		let mut last_result = None;
84		for attempt in 0..self.max_attempts {
85			let result = f();
86			match &result.error {
87				None => return result,
88				Some(err) if err.code == "TXN_001" => {
89					warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
90					last_result = Some(result);
91					if attempt + 1 < self.max_attempts {
92						match &self.backoff {
93							Backoff::None => {}
94							Backoff::Fixed(d) => thread::sleep(*d),
95							Backoff::Exponential {
96								base,
97								max,
98							} => {
99								let delay = (*base) * 2u32.saturating_pow(attempt);
100								thread::sleep(delay.min(*max));
101							}
102						}
103					}
104				}
105				Some(_) => {
106					return result;
107				}
108			}
109		}
110		last_result.unwrap()
111	}
112}
113
114/// A unified session binding an identity to a database engine.
115pub struct Session {
116	engine: StandardEngine,
117	identity: IdentityId,
118	authenticated: bool,
119	token: Option<String>,
120	retry: RetryStrategy,
121}
122
123impl Session {
124	/// Create a session from a validated auth token (server path).
125	pub fn from_token(engine: StandardEngine, info: &Token) -> Self {
126		Self {
127			engine,
128			identity: info.identity,
129			authenticated: true,
130			token: None,
131			retry: RetryStrategy::default(),
132		}
133	}
134
135	/// Create a session from a validated auth token, preserving the token string.
136	pub fn from_token_with_value(engine: StandardEngine, info: &Token) -> Self {
137		Self {
138			engine,
139			identity: info.identity,
140			authenticated: true,
141			token: Some(info.token.clone()),
142			retry: RetryStrategy::default(),
143		}
144	}
145
146	/// Create a trusted session (embedded path, no authentication required).
147	pub fn trusted(engine: StandardEngine, identity: IdentityId) -> Self {
148		Self {
149			engine,
150			identity,
151			authenticated: false,
152			token: None,
153			retry: RetryStrategy::default(),
154		}
155	}
156
157	/// Create an anonymous session.
158	pub fn anonymous(engine: StandardEngine) -> Self {
159		Self::trusted(engine, IdentityId::anonymous())
160	}
161
162	/// Set the retry strategy for command and admin operations.
163	pub fn with_retry(mut self, strategy: RetryStrategy) -> Self {
164		self.retry = strategy;
165		self
166	}
167
168	/// The identity associated with this session.
169	#[inline]
170	pub fn identity(&self) -> IdentityId {
171		self.identity
172	}
173
174	/// The auth token, if this session was created from a validated token.
175	#[inline]
176	pub fn token(&self) -> Option<&str> {
177		self.token.as_deref()
178	}
179
180	/// Whether this session was created from authenticated credentials.
181	#[inline]
182	pub fn is_authenticated(&self) -> bool {
183		self.authenticated
184	}
185
186	/// Execute a read-only query.
187	#[instrument(name = "session::query", level = "debug", skip(self, params), fields(rql = %rql))]
188	pub fn query(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
189		self.engine.query_as(self.identity, rql, params.into())
190	}
191
192	/// Execute a transactional command (DML + Query) with retry on conflict.
193	#[instrument(name = "session::command", level = "debug", skip(self, params), fields(rql = %rql))]
194	pub fn command(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
195		let params = params.into();
196		self.retry.execute(rql, || self.engine.command_as(self.identity, rql, params.clone()))
197	}
198
199	/// Execute an admin (DDL + DML + Query) operation with retry on conflict.
200	#[instrument(name = "session::admin", level = "debug", skip(self, params), fields(rql = %rql))]
201	pub fn admin(&self, rql: &str, params: impl Into<Params>) -> ExecutionResult {
202		let params = params.into();
203		self.retry.execute(rql, || self.engine.admin_as(self.identity, rql, params.clone()))
204	}
205}