Skip to main content

reinhardt_db/orm/
connection.rs

1//! Database connection management
2//!
3//! This module provides the main `DatabaseConnection` type which wraps
4//! the backend-specific connection implementations.
5
6use async_trait::async_trait;
7
8/// Re-export backends types
9pub use crate::backends::connection::DatabaseConnection as BackendsConnection;
10pub use crate::backends::types::{IsolationLevel, QueryValue, Row, TransactionExecutor};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13/// Defines possible database backend values.
14pub enum DatabaseBackend {
15	/// Postgres variant.
16	Postgres,
17	/// MySql variant.
18	MySql,
19	/// Sqlite variant.
20	Sqlite,
21}
22
23/// Query row wrapper for ORM compatibility
24#[derive(serde::Serialize)]
25pub struct QueryRow {
26	/// The data.
27	pub data: serde_json::Value,
28	// Allow dead_code: field reserved for future connection metadata tracking
29	#[allow(dead_code)]
30	#[serde(skip)]
31	inner: Option<Row>,
32}
33
34impl QueryRow {
35	/// Creates a new instance.
36	pub fn new(data: serde_json::Value) -> Self {
37		Self { data, inner: None }
38	}
39
40	/// Creates an instance from backend row.
41	pub fn from_backend_row(row: Row) -> Self {
42		// Convert Row to JSON for backward compatibility
43		let mut map = serde_json::Map::new();
44		for (key, value) in row.data.iter() {
45			let json_value = match value.clone() {
46				QueryValue::Null => serde_json::Value::Null,
47				QueryValue::Bool(b) => serde_json::Value::Bool(b),
48				QueryValue::Int(i) => serde_json::Value::Number(i.into()),
49				QueryValue::Float(f) => serde_json::Number::from_f64(f)
50					.map(serde_json::Value::Number)
51					.unwrap_or(serde_json::Value::Null),
52				QueryValue::String(s) => serde_json::Value::String(s),
53				QueryValue::Bytes(b) => {
54					// Encode bytes as base64 string
55					use base64::Engine;
56					serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(&b))
57				}
58				QueryValue::Timestamp(dt) => serde_json::Value::String(dt.to_rfc3339()),
59				QueryValue::Uuid(u) => serde_json::Value::String(u.to_string()),
60				// NOW() should never appear in Row data (it's resolved to actual timestamp in database)
61				QueryValue::Now => panic!("QueryValue::Now should not appear in Row data"),
62			};
63			map.insert(key.clone(), json_value);
64		}
65
66		Self {
67			data: serde_json::Value::Object(map),
68			inner: Some(row),
69		}
70	}
71
72	/// Get a value from the row by column name
73	///
74	/// This method extracts a value from the row's JSON data by key.
75	/// Supports common types like i64, f64, bool, and String.
76	pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
77		self.data
78			.get(key)
79			.and_then(|v| serde_json::from_value(v.clone()).ok())
80	}
81}
82
83#[async_trait]
84/// Trait defining database executor behavior.
85pub trait DatabaseExecutor: Send + Sync {
86	/// Executes a SQL statement and returns the number of affected rows.
87	async fn execute(&self, sql: &str) -> Result<u64, anyhow::Error>;
88	/// Executes a SQL query and returns the resulting rows.
89	async fn query(&self, sql: &str) -> Result<Vec<QueryRow>, anyhow::Error>;
90}
91
92/// Database connection wrapper
93#[derive(Clone)]
94pub struct DatabaseConnection {
95	backend: DatabaseBackend,
96	inner: BackendsConnection,
97}
98
99impl DatabaseConnection {
100	/// Creates a new instance.
101	pub fn new(backend: DatabaseBackend, inner: BackendsConnection) -> Self {
102		Self { backend, inner }
103	}
104
105	/// Connect to a database from a connection URL
106	///
107	/// Automatically detects the database type from the URL scheme:
108	/// - `postgres://` or `postgresql://` → PostgreSQL
109	/// - `mysql://` → MySQL
110	/// - `sqlite://` or `sqlite:` → SQLite
111	///
112	/// # Examples
113	///
114	/// ```no_run
115	/// # async fn example() {
116	/// use reinhardt_db::orm::connection::DatabaseConnection;
117	///
118	/// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await.unwrap();
119	/// # }
120	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
121	/// ```
122	pub async fn connect(url: &str) -> Result<Self, anyhow::Error> {
123		Self::connect_with_pool_size(url, None).await
124	}
125
126	/// Connect to a PostgreSQL database
127	///
128	/// # Examples
129	///
130	/// ```no_run
131	/// # async fn example() {
132	/// use reinhardt_db::orm::connection::DatabaseConnection;
133	///
134	/// let conn = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
135	/// # }
136	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
137	/// ```
138	#[cfg(feature = "postgres")]
139	pub async fn connect_postgres(url: &str) -> Result<Self, anyhow::Error> {
140		let inner = BackendsConnection::connect_postgres(url).await?;
141		Ok(Self {
142			backend: DatabaseBackend::Postgres,
143			inner,
144		})
145	}
146
147	/// Connect to a MySQL database
148	///
149	/// # Examples
150	///
151	/// ```no_run
152	/// # async fn example() {
153	/// use reinhardt_db::orm::connection::DatabaseConnection;
154	///
155	/// let conn = DatabaseConnection::connect_mysql("mysql://localhost/mydb").await.unwrap();
156	/// # }
157	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
158	/// ```
159	#[cfg(feature = "mysql")]
160	pub async fn connect_mysql(url: &str) -> Result<Self, anyhow::Error> {
161		let inner = BackendsConnection::connect_mysql(url).await?;
162		Ok(Self {
163			backend: DatabaseBackend::MySql,
164			inner,
165		})
166	}
167
168	/// Connect to a SQLite database
169	///
170	/// # Examples
171	///
172	/// ```no_run
173	/// # async fn example() {
174	/// use reinhardt_db::orm::connection::DatabaseConnection;
175	///
176	/// let conn = DatabaseConnection::connect_sqlite("sqlite::memory:").await.unwrap();
177	/// # }
178	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
179	/// ```
180	#[cfg(feature = "sqlite")]
181	pub async fn connect_sqlite(url: &str) -> Result<Self, anyhow::Error> {
182		let inner = BackendsConnection::connect_sqlite(url).await?;
183		Ok(Self {
184			backend: DatabaseBackend::Sqlite,
185			inner,
186		})
187	}
188
189	/// Connect to a database with a specific connection pool size
190	///
191	/// # Arguments
192	///
193	/// * `url` - Database connection URL
194	/// * `pool_size` - Maximum number of connections in the pool (None = use default)
195	///
196	/// # Examples
197	///
198	/// ```no_run
199	/// # async fn example() {
200	/// use reinhardt_db::orm::connection::DatabaseConnection;
201	///
202	/// // Use larger pool for high-concurrency scenarios
203	/// let conn = DatabaseConnection::connect_with_pool_size(
204	///     "postgres://localhost/mydb",
205	///     Some(50)
206	/// ).await.unwrap();
207	/// # }
208	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
209	/// ```
210	// Allow unused_variables because pool_size is only used with Postgres backend.
211	// MySQL and SQLite backends don't support pool size configuration yet.
212	#[allow(unused_variables)]
213	pub async fn connect_with_pool_size(
214		url: &str,
215		pool_size: Option<u32>,
216	) -> Result<Self, anyhow::Error> {
217		let backend_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
218			DatabaseBackend::Postgres
219		} else if url.starts_with("mysql://") {
220			DatabaseBackend::MySql
221		} else if url.starts_with("sqlite://") || url.starts_with("sqlite:") {
222			DatabaseBackend::Sqlite
223		} else {
224			return Err(anyhow::anyhow!("Unsupported database URL scheme: {}", url));
225		};
226
227		#[cfg(feature = "postgres")]
228		if backend_type == DatabaseBackend::Postgres {
229			let inner = BackendsConnection::connect_postgres_with_pool_size(url, pool_size).await?;
230			return Ok(Self {
231				backend: backend_type,
232				inner,
233			});
234		}
235
236		#[cfg(feature = "mysql")]
237		if backend_type == DatabaseBackend::MySql {
238			let inner = BackendsConnection::connect_mysql(url).await?;
239			return Ok(Self {
240				backend: backend_type,
241				inner,
242			});
243		}
244
245		#[cfg(feature = "sqlite")]
246		if backend_type == DatabaseBackend::Sqlite {
247			let inner = BackendsConnection::connect_sqlite(url).await?;
248			return Ok(Self {
249				backend: backend_type,
250				inner,
251			});
252		}
253
254		Err(anyhow::anyhow!(
255			"Database backend not compiled in. Enable the '{}' feature.",
256			match backend_type {
257				DatabaseBackend::Postgres => "postgres",
258				DatabaseBackend::MySql => "mysql",
259				DatabaseBackend::Sqlite => "sqlite",
260			}
261		))
262	}
263
264	/// Performs the backend operation.
265	pub fn backend(&self) -> DatabaseBackend {
266		self.backend
267	}
268
269	/// Get a reference to the inner backends connection
270	///
271	/// This provides access to the low-level connection for operations
272	/// that require direct database access.
273	pub fn inner(&self) -> &BackendsConnection {
274		&self.inner
275	}
276
277	/// Consume self and return the inner backends connection
278	///
279	/// This is useful when you need to pass ownership of the connection
280	/// to functions that expect a `BackendsConnection`.
281	pub fn into_inner(self) -> BackendsConnection {
282		self.inner
283	}
284
285	/// Execute a SQL query and return a single row
286	pub async fn query_one(
287		&self,
288		sql: &str,
289		params: Vec<QueryValue>,
290	) -> Result<QueryRow, anyhow::Error> {
291		let row = self.inner.fetch_one(sql, params).await?;
292		Ok(QueryRow::from_backend_row(row))
293	}
294
295	/// Execute a SQL query and return an optional row
296	pub async fn query_optional(
297		&self,
298		sql: &str,
299		params: Vec<QueryValue>,
300	) -> Result<Option<QueryRow>, anyhow::Error> {
301		match self.inner.fetch_one(sql, params).await {
302			Ok(row) => Ok(Some(QueryRow::from_backend_row(row))),
303			Err(_) => Ok(None),
304		}
305	}
306
307	/// Execute a SQL statement (INSERT, UPDATE, DELETE, etc.)
308	pub async fn execute(&self, sql: &str, params: Vec<QueryValue>) -> Result<u64, anyhow::Error> {
309		let result = self.inner.execute(sql, params).await?;
310		Ok(result.rows_affected)
311	}
312
313	/// Execute a SQL query and return all rows
314	pub async fn query(
315		&self,
316		sql: &str,
317		params: Vec<QueryValue>,
318	) -> Result<Vec<QueryRow>, anyhow::Error> {
319		let rows = self.inner.fetch_all(sql, params).await?;
320		Ok(rows.into_iter().map(QueryRow::from_backend_row).collect())
321	}
322
323	/// Begin a database transaction
324	///
325	/// # Examples
326	///
327	/// ```no_run
328	/// # async fn example() {
329	/// use reinhardt_db::orm::connection::DatabaseConnection;
330	///
331	/// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
332	/// let result = conn.begin_transaction().await;
333	/// assert!(result.is_ok());
334	/// # }
335	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
336	/// ```
337	pub async fn begin_transaction(&self) -> Result<(), anyhow::Error> {
338		self.execute("BEGIN TRANSACTION", vec![]).await?;
339		Ok(())
340	}
341
342	/// Begin a transaction with a specific isolation level
343	///
344	/// # Examples
345	///
346	/// ```no_run
347	/// # async fn example() {
348	/// use reinhardt_db::orm::connection::DatabaseConnection;
349	/// use reinhardt_db::orm::transaction::IsolationLevel;
350	///
351	/// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
352	/// let result = conn.begin_transaction_with_isolation(IsolationLevel::Serializable).await;
353	/// assert!(result.is_ok());
354	/// # }
355	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
356	/// ```
357	pub async fn begin_transaction_with_isolation(
358		&self,
359		level: super::transaction::IsolationLevel,
360	) -> Result<(), anyhow::Error> {
361		let sql = format!("BEGIN TRANSACTION ISOLATION LEVEL {}", level.to_sql());
362		self.execute(&sql, vec![]).await?;
363		Ok(())
364	}
365
366	/// Commit the current transaction
367	///
368	/// # Examples
369	///
370	/// ```no_run
371	/// # async fn example() {
372	/// use reinhardt_db::orm::connection::DatabaseConnection;
373	///
374	/// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
375	/// conn.begin_transaction().await.unwrap();
376	/// // ... perform operations ...
377	/// let result = conn.commit_transaction().await;
378	/// assert!(result.is_ok());
379	/// # }
380	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
381	/// ```
382	pub async fn commit_transaction(&self) -> Result<(), anyhow::Error> {
383		self.execute("COMMIT", vec![]).await?;
384		Ok(())
385	}
386
387	/// Rollback the current transaction
388	///
389	/// # Examples
390	///
391	/// ```no_run
392	/// # async fn example() {
393	/// use reinhardt_db::orm::connection::DatabaseConnection;
394	///
395	/// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
396	/// conn.begin_transaction().await.unwrap();
397	/// // ... error occurs ...
398	/// let result = conn.rollback_transaction().await;
399	/// assert!(result.is_ok());
400	/// # }
401	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
402	/// ```
403	pub async fn rollback_transaction(&self) -> Result<(), anyhow::Error> {
404		self.execute("ROLLBACK", vec![]).await?;
405		Ok(())
406	}
407
408	/// Create a savepoint for nested transactions
409	///
410	/// # Examples
411	///
412	/// ```no_run
413	/// # async fn example() {
414	/// use reinhardt_db::orm::connection::DatabaseConnection;
415	///
416	/// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
417	/// conn.begin_transaction().await.unwrap();
418	/// let result = conn.savepoint("sp1").await;
419	/// assert!(result.is_ok());
420	/// // ... nested operations ...
421	/// conn.release_savepoint("sp1").await.unwrap();
422	/// # }
423	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
424	/// ```
425	pub async fn savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
426		let sql = format!("SAVEPOINT {}", name);
427		self.execute(&sql, vec![]).await?;
428		Ok(())
429	}
430
431	/// Release a savepoint
432	pub async fn release_savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
433		let sql = format!("RELEASE SAVEPOINT {}", name);
434		self.execute(&sql, vec![]).await?;
435		Ok(())
436	}
437
438	/// Rollback to a savepoint
439	pub async fn rollback_to_savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
440		let sql = format!("ROLLBACK TO SAVEPOINT {}", name);
441		self.execute(&sql, vec![]).await?;
442		Ok(())
443	}
444
445	/// Begin a database transaction and return a dedicated executor
446	///
447	/// This method acquires a dedicated database connection and begins a
448	/// transaction on it. All queries executed through the returned
449	/// `TransactionExecutor` are guaranteed to run on the same physical
450	/// connection, ensuring proper transaction isolation.
451	///
452	/// # Returns
453	///
454	/// A boxed `TransactionExecutor` that holds the dedicated connection
455	/// and provides methods for executing queries within the transaction.
456	///
457	/// # Example
458	///
459	/// ```no_run
460	/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
461	/// use reinhardt_db::orm::connection::DatabaseConnection;
462	///
463	/// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await?;
464	/// let mut tx = conn.begin().await?;
465	///
466	/// tx.execute("INSERT INTO users (name) VALUES ($1)", vec!["Alice".into()]).await?;
467	/// tx.commit().await?;
468	/// # Ok(())
469	/// # }
470	/// ```
471	pub async fn begin(&self) -> Result<Box<dyn TransactionExecutor>, anyhow::Error> {
472		Ok(self.inner.begin().await?)
473	}
474
475	/// Begin a transaction with a specific isolation level using TransactionExecutor
476	///
477	/// This method returns a `TransactionExecutor` that provides dedicated connection
478	/// semantics with the specified isolation level. All queries executed through
479	/// the returned executor are guaranteed to run on the same physical connection.
480	///
481	/// # Arguments
482	///
483	/// * `level` - The desired isolation level for the transaction
484	///
485	/// # Examples
486	///
487	/// ```no_run
488	/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
489	/// use reinhardt_db::orm::connection::{DatabaseConnection, IsolationLevel};
490	///
491	/// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await?;
492	/// let mut tx = conn.begin_with_isolation(IsolationLevel::Serializable).await?;
493	///
494	/// tx.execute("INSERT INTO users (name) VALUES ($1)", vec!["Alice".into()]).await?;
495	/// tx.commit().await?;
496	/// # Ok(())
497	/// # }
498	/// ```
499	pub async fn begin_with_isolation(
500		&self,
501		level: IsolationLevel,
502	) -> Result<Box<dyn TransactionExecutor>, anyhow::Error> {
503		Ok(self.inner.begin_with_isolation(level).await?)
504	}
505}
506
507#[async_trait]
508impl DatabaseExecutor for DatabaseConnection {
509	async fn execute(&self, sql: &str) -> Result<u64, anyhow::Error> {
510		self.execute(sql, vec![]).await
511	}
512
513	async fn query(&self, sql: &str) -> Result<Vec<QueryRow>, anyhow::Error> {
514		self.query(sql, vec![]).await
515	}
516}
517
518/// Injectable implementation for DatabaseConnection
519///
520/// DatabaseConnection must be explicitly registered in the DI context using
521/// `InjectionContextBuilder::singleton()`. It cannot be auto-injected because
522/// it requires runtime configuration (connection URL, pool settings, etc.).
523///
524/// # Example
525///
526/// ```rust,no_run
527/// use reinhardt_db::orm::DatabaseConnection;
528/// use reinhardt_di::InjectionContext;
529///
530/// # async fn example() {
531/// // First, establish a database connection
532/// let db = DatabaseConnection::connect("postgres://localhost/mydb").await.unwrap();
533///
534/// // Then register it in the DI context as a singleton
535/// let singleton_scope = reinhardt_di::SingletonScope::new();
536/// let ctx = InjectionContext::builder(singleton_scope)
537///     .singleton(db)
538///     .build();
539/// # }
540/// ```
541#[cfg(feature = "di")]
542#[async_trait]
543impl reinhardt_di::Injectable for DatabaseConnection {
544	async fn inject(ctx: &reinhardt_di::InjectionContext) -> reinhardt_di::DiResult<Self> {
545		// Try singleton scope first (primary expected location)
546		if let Some(conn) = ctx.get_singleton::<Self>() {
547			return Ok(std::sync::Arc::try_unwrap(conn).unwrap_or_else(|arc| (*arc).clone()));
548		}
549
550		// Try request scope as fallback
551		if let Some(conn) = ctx.get_request::<Self>() {
552			return Ok(std::sync::Arc::try_unwrap(conn).unwrap_or_else(|arc| (*arc).clone()));
553		}
554
555		// Not registered - provide helpful error
556		Err(reinhardt_di::DiError::NotRegistered {
557			type_name: std::any::type_name::<Self>().to_string(),
558			hint: "Use InjectionContextBuilder::singleton(db_connection) to register a \
559			       DatabaseConnection. Create it with DatabaseConnection::connect(), \
560			       connect_postgres(), connect_sqlite(), or connect_mysql()."
561				.to_string(),
562		})
563	}
564
565	async fn inject_uncached(ctx: &reinhardt_di::InjectionContext) -> reinhardt_di::DiResult<Self> {
566		// For DatabaseConnection, inject_uncached behaves the same as inject
567		// since database connections are typically shared (singleton or request-scoped)
568		Self::inject(ctx).await
569	}
570}