reinhardt_db/orm/connection.rs
1//! Database connection management
2//!
3//! This module provides the main `DatabaseConnection` type which wraps
4//! the backend-specific connection implementations.
5
6use async_trait::async_trait;
7
8/// Re-export backends types
9pub use crate::backends::connection::DatabaseConnection as BackendsConnection;
10pub use crate::backends::types::{IsolationLevel, QueryValue, Row, TransactionExecutor};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13/// Defines possible database backend values.
14pub enum DatabaseBackend {
15 /// Postgres variant.
16 Postgres,
17 /// MySql variant.
18 MySql,
19 /// Sqlite variant.
20 Sqlite,
21}
22
23/// Query row wrapper for ORM compatibility
24#[derive(serde::Serialize)]
25pub struct QueryRow {
26 /// The data.
27 pub data: serde_json::Value,
28 // Allow dead_code: field reserved for future connection metadata tracking
29 #[allow(dead_code)]
30 #[serde(skip)]
31 inner: Option<Row>,
32}
33
34impl QueryRow {
35 /// Creates a new instance.
36 pub fn new(data: serde_json::Value) -> Self {
37 Self { data, inner: None }
38 }
39
40 /// Creates an instance from backend row.
41 pub fn from_backend_row(row: Row) -> Self {
42 // Convert Row to JSON for backward compatibility
43 let mut map = serde_json::Map::new();
44 for (key, value) in row.data.iter() {
45 let json_value = match value.clone() {
46 QueryValue::Null => serde_json::Value::Null,
47 QueryValue::Bool(b) => serde_json::Value::Bool(b),
48 QueryValue::Int(i) => serde_json::Value::Number(i.into()),
49 QueryValue::Float(f) => serde_json::Number::from_f64(f)
50 .map(serde_json::Value::Number)
51 .unwrap_or(serde_json::Value::Null),
52 QueryValue::String(s) => serde_json::Value::String(s),
53 QueryValue::Bytes(b) => {
54 // Encode bytes as base64 string
55 use base64::Engine;
56 serde_json::Value::String(base64::engine::general_purpose::STANDARD.encode(&b))
57 }
58 QueryValue::Timestamp(dt) => serde_json::Value::String(dt.to_rfc3339()),
59 QueryValue::Uuid(u) => serde_json::Value::String(u.to_string()),
60 // NOW() should never appear in Row data (it's resolved to actual timestamp in database)
61 QueryValue::Now => panic!("QueryValue::Now should not appear in Row data"),
62 };
63 map.insert(key.clone(), json_value);
64 }
65
66 Self {
67 data: serde_json::Value::Object(map),
68 inner: Some(row),
69 }
70 }
71
72 /// Get a value from the row by column name
73 ///
74 /// This method extracts a value from the row's JSON data by key.
75 /// Supports common types like i64, f64, bool, and String.
76 pub fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
77 self.data
78 .get(key)
79 .and_then(|v| serde_json::from_value(v.clone()).ok())
80 }
81}
82
83#[async_trait]
84/// Trait defining database executor behavior.
85pub trait DatabaseExecutor: Send + Sync {
86 /// Executes a SQL statement and returns the number of affected rows.
87 async fn execute(&self, sql: &str) -> Result<u64, anyhow::Error>;
88 /// Executes a SQL query and returns the resulting rows.
89 async fn query(&self, sql: &str) -> Result<Vec<QueryRow>, anyhow::Error>;
90}
91
92/// Database connection wrapper
93#[derive(Clone)]
94pub struct DatabaseConnection {
95 backend: DatabaseBackend,
96 inner: BackendsConnection,
97}
98
99impl DatabaseConnection {
100 /// Creates a new instance.
101 pub fn new(backend: DatabaseBackend, inner: BackendsConnection) -> Self {
102 Self { backend, inner }
103 }
104
105 /// Connect to a database from a connection URL
106 ///
107 /// Automatically detects the database type from the URL scheme:
108 /// - `postgres://` or `postgresql://` → PostgreSQL
109 /// - `mysql://` → MySQL
110 /// - `sqlite://` or `sqlite:` → SQLite
111 ///
112 /// # Examples
113 ///
114 /// ```no_run
115 /// # async fn example() {
116 /// use reinhardt_db::orm::connection::DatabaseConnection;
117 ///
118 /// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await.unwrap();
119 /// # }
120 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
121 /// ```
122 pub async fn connect(url: &str) -> Result<Self, anyhow::Error> {
123 Self::connect_with_pool_size(url, None).await
124 }
125
126 /// Connect to a PostgreSQL database
127 ///
128 /// # Examples
129 ///
130 /// ```no_run
131 /// # async fn example() {
132 /// use reinhardt_db::orm::connection::DatabaseConnection;
133 ///
134 /// let conn = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
135 /// # }
136 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
137 /// ```
138 #[cfg(feature = "postgres")]
139 pub async fn connect_postgres(url: &str) -> Result<Self, anyhow::Error> {
140 let inner = BackendsConnection::connect_postgres(url).await?;
141 Ok(Self {
142 backend: DatabaseBackend::Postgres,
143 inner,
144 })
145 }
146
147 /// Connect to a MySQL database
148 ///
149 /// # Examples
150 ///
151 /// ```no_run
152 /// # async fn example() {
153 /// use reinhardt_db::orm::connection::DatabaseConnection;
154 ///
155 /// let conn = DatabaseConnection::connect_mysql("mysql://localhost/mydb").await.unwrap();
156 /// # }
157 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
158 /// ```
159 #[cfg(feature = "mysql")]
160 pub async fn connect_mysql(url: &str) -> Result<Self, anyhow::Error> {
161 let inner = BackendsConnection::connect_mysql(url).await?;
162 Ok(Self {
163 backend: DatabaseBackend::MySql,
164 inner,
165 })
166 }
167
168 /// Connect to a SQLite database
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// # async fn example() {
174 /// use reinhardt_db::orm::connection::DatabaseConnection;
175 ///
176 /// let conn = DatabaseConnection::connect_sqlite("sqlite::memory:").await.unwrap();
177 /// # }
178 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
179 /// ```
180 #[cfg(feature = "sqlite")]
181 pub async fn connect_sqlite(url: &str) -> Result<Self, anyhow::Error> {
182 let inner = BackendsConnection::connect_sqlite(url).await?;
183 Ok(Self {
184 backend: DatabaseBackend::Sqlite,
185 inner,
186 })
187 }
188
189 /// Connect to a database with a specific connection pool size
190 ///
191 /// # Arguments
192 ///
193 /// * `url` - Database connection URL
194 /// * `pool_size` - Maximum number of connections in the pool (None = use default)
195 ///
196 /// # Examples
197 ///
198 /// ```no_run
199 /// # async fn example() {
200 /// use reinhardt_db::orm::connection::DatabaseConnection;
201 ///
202 /// // Use larger pool for high-concurrency scenarios
203 /// let conn = DatabaseConnection::connect_with_pool_size(
204 /// "postgres://localhost/mydb",
205 /// Some(50)
206 /// ).await.unwrap();
207 /// # }
208 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
209 /// ```
210 // Allow unused_variables because pool_size is only used with Postgres backend.
211 // MySQL and SQLite backends don't support pool size configuration yet.
212 #[allow(unused_variables)]
213 pub async fn connect_with_pool_size(
214 url: &str,
215 pool_size: Option<u32>,
216 ) -> Result<Self, anyhow::Error> {
217 let backend_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
218 DatabaseBackend::Postgres
219 } else if url.starts_with("mysql://") {
220 DatabaseBackend::MySql
221 } else if url.starts_with("sqlite://") || url.starts_with("sqlite:") {
222 DatabaseBackend::Sqlite
223 } else {
224 return Err(anyhow::anyhow!("Unsupported database URL scheme: {}", url));
225 };
226
227 #[cfg(feature = "postgres")]
228 if backend_type == DatabaseBackend::Postgres {
229 let inner = BackendsConnection::connect_postgres_with_pool_size(url, pool_size).await?;
230 return Ok(Self {
231 backend: backend_type,
232 inner,
233 });
234 }
235
236 #[cfg(feature = "mysql")]
237 if backend_type == DatabaseBackend::MySql {
238 let inner = BackendsConnection::connect_mysql(url).await?;
239 return Ok(Self {
240 backend: backend_type,
241 inner,
242 });
243 }
244
245 #[cfg(feature = "sqlite")]
246 if backend_type == DatabaseBackend::Sqlite {
247 let inner = BackendsConnection::connect_sqlite(url).await?;
248 return Ok(Self {
249 backend: backend_type,
250 inner,
251 });
252 }
253
254 Err(anyhow::anyhow!(
255 "Database backend not compiled in. Enable the '{}' feature.",
256 match backend_type {
257 DatabaseBackend::Postgres => "postgres",
258 DatabaseBackend::MySql => "mysql",
259 DatabaseBackend::Sqlite => "sqlite",
260 }
261 ))
262 }
263
264 /// Performs the backend operation.
265 pub fn backend(&self) -> DatabaseBackend {
266 self.backend
267 }
268
269 /// Get a reference to the inner backends connection
270 ///
271 /// This provides access to the low-level connection for operations
272 /// that require direct database access.
273 pub fn inner(&self) -> &BackendsConnection {
274 &self.inner
275 }
276
277 /// Consume self and return the inner backends connection
278 ///
279 /// This is useful when you need to pass ownership of the connection
280 /// to functions that expect a `BackendsConnection`.
281 pub fn into_inner(self) -> BackendsConnection {
282 self.inner
283 }
284
285 /// Execute a SQL query and return a single row
286 pub async fn query_one(
287 &self,
288 sql: &str,
289 params: Vec<QueryValue>,
290 ) -> Result<QueryRow, anyhow::Error> {
291 let row = self.inner.fetch_one(sql, params).await?;
292 Ok(QueryRow::from_backend_row(row))
293 }
294
295 /// Execute a SQL query and return an optional row
296 pub async fn query_optional(
297 &self,
298 sql: &str,
299 params: Vec<QueryValue>,
300 ) -> Result<Option<QueryRow>, anyhow::Error> {
301 match self.inner.fetch_one(sql, params).await {
302 Ok(row) => Ok(Some(QueryRow::from_backend_row(row))),
303 Err(_) => Ok(None),
304 }
305 }
306
307 /// Execute a SQL statement (INSERT, UPDATE, DELETE, etc.)
308 pub async fn execute(&self, sql: &str, params: Vec<QueryValue>) -> Result<u64, anyhow::Error> {
309 let result = self.inner.execute(sql, params).await?;
310 Ok(result.rows_affected)
311 }
312
313 /// Execute a SQL query and return all rows
314 pub async fn query(
315 &self,
316 sql: &str,
317 params: Vec<QueryValue>,
318 ) -> Result<Vec<QueryRow>, anyhow::Error> {
319 let rows = self.inner.fetch_all(sql, params).await?;
320 Ok(rows.into_iter().map(QueryRow::from_backend_row).collect())
321 }
322
323 /// Begin a database transaction
324 ///
325 /// # Examples
326 ///
327 /// ```no_run
328 /// # async fn example() {
329 /// use reinhardt_db::orm::connection::DatabaseConnection;
330 ///
331 /// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
332 /// let result = conn.begin_transaction().await;
333 /// assert!(result.is_ok());
334 /// # }
335 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
336 /// ```
337 pub async fn begin_transaction(&self) -> Result<(), anyhow::Error> {
338 self.execute("BEGIN TRANSACTION", vec![]).await?;
339 Ok(())
340 }
341
342 /// Begin a transaction with a specific isolation level
343 ///
344 /// # Examples
345 ///
346 /// ```no_run
347 /// # async fn example() {
348 /// use reinhardt_db::orm::connection::DatabaseConnection;
349 /// use reinhardt_db::orm::transaction::IsolationLevel;
350 ///
351 /// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
352 /// let result = conn.begin_transaction_with_isolation(IsolationLevel::Serializable).await;
353 /// assert!(result.is_ok());
354 /// # }
355 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
356 /// ```
357 pub async fn begin_transaction_with_isolation(
358 &self,
359 level: super::transaction::IsolationLevel,
360 ) -> Result<(), anyhow::Error> {
361 let sql = format!("BEGIN TRANSACTION ISOLATION LEVEL {}", level.to_sql());
362 self.execute(&sql, vec![]).await?;
363 Ok(())
364 }
365
366 /// Commit the current transaction
367 ///
368 /// # Examples
369 ///
370 /// ```no_run
371 /// # async fn example() {
372 /// use reinhardt_db::orm::connection::DatabaseConnection;
373 ///
374 /// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
375 /// conn.begin_transaction().await.unwrap();
376 /// // ... perform operations ...
377 /// let result = conn.commit_transaction().await;
378 /// assert!(result.is_ok());
379 /// # }
380 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
381 /// ```
382 pub async fn commit_transaction(&self) -> Result<(), anyhow::Error> {
383 self.execute("COMMIT", vec![]).await?;
384 Ok(())
385 }
386
387 /// Rollback the current transaction
388 ///
389 /// # Examples
390 ///
391 /// ```no_run
392 /// # async fn example() {
393 /// use reinhardt_db::orm::connection::DatabaseConnection;
394 ///
395 /// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
396 /// conn.begin_transaction().await.unwrap();
397 /// // ... error occurs ...
398 /// let result = conn.rollback_transaction().await;
399 /// assert!(result.is_ok());
400 /// # }
401 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
402 /// ```
403 pub async fn rollback_transaction(&self) -> Result<(), anyhow::Error> {
404 self.execute("ROLLBACK", vec![]).await?;
405 Ok(())
406 }
407
408 /// Create a savepoint for nested transactions
409 ///
410 /// # Examples
411 ///
412 /// ```no_run
413 /// # async fn example() {
414 /// use reinhardt_db::orm::connection::DatabaseConnection;
415 ///
416 /// let conn = DatabaseConnection::connect("sqlite::memory:").await.unwrap();
417 /// conn.begin_transaction().await.unwrap();
418 /// let result = conn.savepoint("sp1").await;
419 /// assert!(result.is_ok());
420 /// // ... nested operations ...
421 /// conn.release_savepoint("sp1").await.unwrap();
422 /// # }
423 /// # tokio::runtime::Runtime::new().unwrap().block_on(example());
424 /// ```
425 pub async fn savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
426 let sql = format!("SAVEPOINT {}", name);
427 self.execute(&sql, vec![]).await?;
428 Ok(())
429 }
430
431 /// Release a savepoint
432 pub async fn release_savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
433 let sql = format!("RELEASE SAVEPOINT {}", name);
434 self.execute(&sql, vec![]).await?;
435 Ok(())
436 }
437
438 /// Rollback to a savepoint
439 pub async fn rollback_to_savepoint(&self, name: &str) -> Result<(), anyhow::Error> {
440 let sql = format!("ROLLBACK TO SAVEPOINT {}", name);
441 self.execute(&sql, vec![]).await?;
442 Ok(())
443 }
444
445 /// Begin a database transaction and return a dedicated executor
446 ///
447 /// This method acquires a dedicated database connection and begins a
448 /// transaction on it. All queries executed through the returned
449 /// `TransactionExecutor` are guaranteed to run on the same physical
450 /// connection, ensuring proper transaction isolation.
451 ///
452 /// # Returns
453 ///
454 /// A boxed `TransactionExecutor` that holds the dedicated connection
455 /// and provides methods for executing queries within the transaction.
456 ///
457 /// # Example
458 ///
459 /// ```no_run
460 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
461 /// use reinhardt_db::orm::connection::DatabaseConnection;
462 ///
463 /// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await?;
464 /// let mut tx = conn.begin().await?;
465 ///
466 /// tx.execute("INSERT INTO users (name) VALUES ($1)", vec!["Alice".into()]).await?;
467 /// tx.commit().await?;
468 /// # Ok(())
469 /// # }
470 /// ```
471 pub async fn begin(&self) -> Result<Box<dyn TransactionExecutor>, anyhow::Error> {
472 Ok(self.inner.begin().await?)
473 }
474
475 /// Begin a transaction with a specific isolation level using TransactionExecutor
476 ///
477 /// This method returns a `TransactionExecutor` that provides dedicated connection
478 /// semantics with the specified isolation level. All queries executed through
479 /// the returned executor are guaranteed to run on the same physical connection.
480 ///
481 /// # Arguments
482 ///
483 /// * `level` - The desired isolation level for the transaction
484 ///
485 /// # Examples
486 ///
487 /// ```no_run
488 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
489 /// use reinhardt_db::orm::connection::{DatabaseConnection, IsolationLevel};
490 ///
491 /// let conn = DatabaseConnection::connect("postgres://localhost/mydb").await?;
492 /// let mut tx = conn.begin_with_isolation(IsolationLevel::Serializable).await?;
493 ///
494 /// tx.execute("INSERT INTO users (name) VALUES ($1)", vec!["Alice".into()]).await?;
495 /// tx.commit().await?;
496 /// # Ok(())
497 /// # }
498 /// ```
499 pub async fn begin_with_isolation(
500 &self,
501 level: IsolationLevel,
502 ) -> Result<Box<dyn TransactionExecutor>, anyhow::Error> {
503 Ok(self.inner.begin_with_isolation(level).await?)
504 }
505}
506
507#[async_trait]
508impl DatabaseExecutor for DatabaseConnection {
509 async fn execute(&self, sql: &str) -> Result<u64, anyhow::Error> {
510 self.execute(sql, vec![]).await
511 }
512
513 async fn query(&self, sql: &str) -> Result<Vec<QueryRow>, anyhow::Error> {
514 self.query(sql, vec![]).await
515 }
516}
517
518/// Injectable implementation for DatabaseConnection
519///
520/// DatabaseConnection must be explicitly registered in the DI context using
521/// `InjectionContextBuilder::singleton()`. It cannot be auto-injected because
522/// it requires runtime configuration (connection URL, pool settings, etc.).
523///
524/// # Example
525///
526/// ```rust,no_run
527/// use reinhardt_db::orm::DatabaseConnection;
528/// use reinhardt_di::InjectionContext;
529///
530/// # async fn example() {
531/// // First, establish a database connection
532/// let db = DatabaseConnection::connect("postgres://localhost/mydb").await.unwrap();
533///
534/// // Then register it in the DI context as a singleton
535/// let singleton_scope = reinhardt_di::SingletonScope::new();
536/// let ctx = InjectionContext::builder(singleton_scope)
537/// .singleton(db)
538/// .build();
539/// # }
540/// ```
541#[cfg(feature = "di")]
542#[async_trait]
543impl reinhardt_di::Injectable for DatabaseConnection {
544 async fn inject(ctx: &reinhardt_di::InjectionContext) -> reinhardt_di::DiResult<Self> {
545 // Try singleton scope first (primary expected location)
546 if let Some(conn) = ctx.get_singleton::<Self>() {
547 return Ok(std::sync::Arc::try_unwrap(conn).unwrap_or_else(|arc| (*arc).clone()));
548 }
549
550 // Try request scope as fallback
551 if let Some(conn) = ctx.get_request::<Self>() {
552 return Ok(std::sync::Arc::try_unwrap(conn).unwrap_or_else(|arc| (*arc).clone()));
553 }
554
555 // Not registered - provide helpful error
556 Err(reinhardt_di::DiError::NotRegistered {
557 type_name: std::any::type_name::<Self>().to_string(),
558 hint: "Use InjectionContextBuilder::singleton(db_connection) to register a \
559 DatabaseConnection. Create it with DatabaseConnection::connect(), \
560 connect_postgres(), connect_sqlite(), or connect_mysql()."
561 .to_string(),
562 })
563 }
564
565 async fn inject_uncached(ctx: &reinhardt_di::InjectionContext) -> reinhardt_di::DiResult<Self> {
566 // For DatabaseConnection, inject_uncached behaves the same as inject
567 // since database connections are typically shared (singleton or request-scoped)
568 Self::inject(ctx).await
569 }
570}