shaperail_runtime/db/
manager.rs1use indexmap::IndexMap;
7use shaperail_core::{DatabaseEngine, NamedDatabaseConfig, ShaperailError};
8use std::sync::Arc;
9
10#[derive(Clone)]
13pub struct SqlConnection {
14 pub inner: Arc<sea_orm::DatabaseConnection>,
16 pub engine: DatabaseEngine,
18}
19
20impl SqlConnection {
21 pub fn backend(&self) -> sea_orm::DatabaseBackend {
23 match self.engine {
24 DatabaseEngine::Postgres => sea_orm::DatabaseBackend::Postgres,
25 DatabaseEngine::MySQL => sea_orm::DatabaseBackend::MySql,
26 DatabaseEngine::SQLite => sea_orm::DatabaseBackend::Sqlite,
27 DatabaseEngine::MongoDB => {
28 sea_orm::DatabaseBackend::Postgres
31 }
32 }
33 }
34
35 pub fn quote_ident(&self, name: &str) -> String {
37 match self.engine {
38 DatabaseEngine::MySQL => format!("`{name}`"),
39 _ => format!("\"{name}\""),
40 }
41 }
42
43 pub fn param(&self, index: usize) -> String {
46 match self.engine {
47 DatabaseEngine::Postgres => format!("${index}"),
48 _ => "?".to_string(),
49 }
50 }
51}
52
53pub struct DatabaseManager {
57 connections: IndexMap<String, SqlConnection>,
59}
60
61impl DatabaseManager {
62 pub async fn from_url(url: &str, pool_size: u32) -> Result<Self, ShaperailError> {
66 let mut connections = IndexMap::new();
67 let mut opt = sea_orm::ConnectOptions::new(url.to_string());
68 opt.max_connections(pool_size).min_connections(1);
69 let conn = sea_orm::Database::connect(opt)
70 .await
71 .map_err(|e| ShaperailError::Internal(format!("Failed to connect to database: {e}")))?;
72 connections.insert(
73 "default".to_string(),
74 SqlConnection {
75 inner: Arc::new(conn),
76 engine: DatabaseEngine::Postgres,
77 },
78 );
79 Ok(Self { connections })
80 }
81
82 pub async fn from_named_config(
87 databases: &IndexMap<String, NamedDatabaseConfig>,
88 ) -> Result<Self, ShaperailError> {
89 let mut connections = IndexMap::new();
90 for (name, cfg) in databases {
91 if !cfg.engine.is_sql() {
92 continue;
93 }
94 let url = &cfg.url;
95 let mut opt = sea_orm::ConnectOptions::new(url.clone());
96 opt.max_connections(cfg.pool_size).min_connections(1);
97 let conn = sea_orm::Database::connect(opt).await.map_err(|e| {
98 ShaperailError::Internal(format!("Failed to connect to database '{name}': {e}"))
99 })?;
100 if cfg.engine == DatabaseEngine::SQLite {
102 use sea_orm::ConnectionTrait;
103 conn.execute(sea_orm::Statement::from_string(
104 sea_orm::DatabaseBackend::Sqlite,
105 "PRAGMA journal_mode=WAL".to_string(),
106 ))
107 .await
108 .map_err(|e| {
109 ShaperailError::Internal(format!(
110 "Failed to enable WAL mode for SQLite '{name}': {e}"
111 ))
112 })?;
113 tracing::info!("SQLite '{name}' WAL mode enabled");
114 }
115 connections.insert(
116 name.clone(),
117 SqlConnection {
118 inner: Arc::new(conn),
119 engine: cfg.engine,
120 },
121 );
122 }
123 if connections.is_empty() {
124 return Err(ShaperailError::Internal(
125 "No SQL databases configured in databases config".to_string(),
126 ));
127 }
128 Ok(Self { connections })
129 }
130
131 pub fn get_sql(&self, name: &str) -> Option<SqlConnection> {
134 self.connections.get(name).cloned()
135 }
136
137 pub const DEFAULT_NAME: &'static str = "default";
139
140 pub fn connection_name_for_resource(&self, db: Option<&String>) -> &str {
142 if let Some(name) = db {
143 if let Some((key, _)) = self.connections.get_key_value(name.as_str()) {
144 return key.as_str();
145 }
146 }
147 Self::DEFAULT_NAME
148 }
149
150 pub fn sql_for_resource(&self, db: Option<&String>) -> Option<SqlConnection> {
152 let name = self.connection_name_for_resource(db);
153 self.get_sql(name)
154 }
155
156 pub fn len(&self) -> usize {
158 self.connections.len()
159 }
160
161 pub fn is_empty(&self) -> bool {
163 self.connections.is_empty()
164 }
165
166 pub fn all_connections(&self) -> impl Iterator<Item = (&str, &SqlConnection)> {
168 self.connections
169 .iter()
170 .map(|(name, conn)| (name.as_str(), conn))
171 }
172}