systemprompt_database/services/
database.rs1use super::postgres::PostgresProvider;
6use super::provider::DatabaseProvider;
7use crate::error::{DatabaseResult, RepositoryError};
8use crate::models::{DatabaseInfo, QueryResult};
9use std::sync::Arc;
10
11pub struct Database {
12 provider: Arc<dyn DatabaseProvider>,
13 write_provider: Option<Arc<dyn DatabaseProvider>>,
14}
15
16impl std::fmt::Debug for Database {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.debug_struct("Database")
19 .field("backend", &"PostgreSQL")
20 .finish()
21 }
22}
23
24impl Database {
25 pub async fn new_postgres(url: &str) -> DatabaseResult<Self> {
26 let provider = PostgresProvider::new(url).await?;
27 Ok(Self {
28 provider: Arc::new(provider),
29 write_provider: None,
30 })
31 }
32
33 pub async fn from_config(db_type: &str, url: &str) -> DatabaseResult<Self> {
34 match db_type.to_lowercase().as_str() {
35 "postgres" | "postgresql" | "" => Self::new_postgres(url).await,
36 other => Err(RepositoryError::invalid_argument(format!(
37 "Unsupported database type: {other}. Only PostgreSQL is supported."
38 ))),
39 }
40 }
41
42 pub async fn from_config_with_write(
43 db_type: &str,
44 read_url: &str,
45 write_url: Option<&str>,
46 ) -> DatabaseResult<Self> {
47 let provider: Arc<dyn DatabaseProvider> = match db_type.to_lowercase().as_str() {
48 "postgres" | "postgresql" | "" => Arc::new(PostgresProvider::new(read_url).await?),
49 other => {
50 return Err(RepositoryError::invalid_argument(format!(
51 "Unsupported database type: {other}. Only PostgreSQL is supported."
52 )));
53 },
54 };
55
56 let write_provider: Option<Arc<dyn DatabaseProvider>> = match write_url {
57 Some(url) => Some(Arc::new(PostgresProvider::new(url).await?)),
58 None => None,
59 };
60
61 Ok(Self {
62 provider,
63 write_provider,
64 })
65 }
66
67 fn require_postgres(pool: Option<Arc<sqlx::PgPool>>) -> DatabaseResult<Arc<sqlx::PgPool>> {
68 pool.ok_or_else(|| RepositoryError::invalid_state("Database is not PostgreSQL"))
69 }
70
71 #[must_use]
74 pub fn read(&self) -> &dyn DatabaseProvider {
75 self.provider.as_ref()
76 }
77
78 #[must_use]
81 pub fn write(&self) -> &dyn DatabaseProvider {
82 self.write_provider
83 .as_deref()
84 .unwrap_or_else(|| self.provider.as_ref())
85 }
86
87 pub fn get_postgres_pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
88 self.read_pool_arc()
89 }
90
91 pub fn write_pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
92 Self::require_postgres(self.write().get_postgres_pool())
93 }
94
95 #[must_use]
96 pub fn write_pool(&self) -> Option<Arc<sqlx::PgPool>> {
97 self.write().get_postgres_pool()
98 }
99
100 #[must_use]
101 pub fn has_write_pool(&self) -> bool {
102 self.write_provider.is_some()
103 }
104
105 #[must_use]
106 pub fn write_provider(&self) -> &dyn DatabaseProvider {
107 self.write()
108 }
109
110 pub async fn query(
111 &self,
112 sql: &dyn crate::models::QuerySelector,
113 ) -> DatabaseResult<QueryResult> {
114 self.read().query_raw(sql).await
115 }
116
117 pub async fn query_with(
118 &self,
119 sql: &dyn crate::models::QuerySelector,
120 params: &[&dyn crate::models::ToDbValue],
121 ) -> DatabaseResult<QueryResult> {
122 self.read().query_raw_with(sql, params).await
123 }
124
125 pub async fn execute_batch(&self, sql: &str) -> DatabaseResult<()> {
126 self.write().execute_batch(sql).await
127 }
128
129 pub async fn get_info(&self) -> DatabaseResult<DatabaseInfo> {
130 self.read().get_database_info().await
131 }
132
133 pub async fn test_connection(&self) -> DatabaseResult<()> {
134 self.provider.test_connection().await?;
135 if let Some(wp) = &self.write_provider {
136 wp.test_connection().await?;
137 }
138 Ok(())
139 }
140
141 #[must_use]
142 pub fn get_postgres_pool(&self) -> Option<Arc<sqlx::PgPool>> {
143 self.read().get_postgres_pool()
144 }
145
146 pub fn pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
147 self.read_pool_arc()
148 }
149
150 #[must_use]
151 pub fn pool(&self) -> Option<Arc<sqlx::PgPool>> {
152 self.read().get_postgres_pool()
153 }
154
155 #[must_use]
156 pub fn read_pool(&self) -> Option<Arc<sqlx::PgPool>> {
157 self.read().get_postgres_pool()
158 }
159
160 pub fn read_pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
161 Self::require_postgres(self.read().get_postgres_pool())
162 }
163
164 pub async fn begin(&self) -> DatabaseResult<sqlx::Transaction<'_, sqlx::Postgres>> {
165 let pool = self.write_pool_arc()?;
166 pool.begin().await.map_err(Into::into)
167 }
168}
169
170pub type DbPool = Arc<Database>;
171
172pub trait DatabaseExt {
173 fn database(&self) -> Arc<Database>;
174}
175
176impl DatabaseExt for Arc<Database> {
177 fn database(&self) -> Arc<Database> {
178 Self::clone(self)
179 }
180}
181
182#[async_trait::async_trait]
183impl DatabaseProvider for Database {
184 fn get_postgres_pool(&self) -> Option<Arc<sqlx::PgPool>> {
185 self.read().get_postgres_pool()
186 }
187
188 async fn execute(
189 &self,
190 query: &dyn crate::models::QuerySelector,
191 params: &[&dyn crate::models::ToDbValue],
192 ) -> DatabaseResult<u64> {
193 self.write().execute(query, params).await
194 }
195
196 async fn execute_raw(&self, sql: &str) -> DatabaseResult<()> {
197 self.write().execute_raw(sql).await
198 }
199
200 async fn fetch_all(
201 &self,
202 query: &dyn crate::models::QuerySelector,
203 params: &[&dyn crate::models::ToDbValue],
204 ) -> DatabaseResult<Vec<crate::models::JsonRow>> {
205 self.read().fetch_all(query, params).await
206 }
207
208 async fn fetch_one(
209 &self,
210 query: &dyn crate::models::QuerySelector,
211 params: &[&dyn crate::models::ToDbValue],
212 ) -> DatabaseResult<crate::models::JsonRow> {
213 self.read().fetch_one(query, params).await
214 }
215
216 async fn fetch_optional(
217 &self,
218 query: &dyn crate::models::QuerySelector,
219 params: &[&dyn crate::models::ToDbValue],
220 ) -> DatabaseResult<Option<crate::models::JsonRow>> {
221 self.read().fetch_optional(query, params).await
222 }
223
224 async fn fetch_scalar_value(
225 &self,
226 query: &dyn crate::models::QuerySelector,
227 params: &[&dyn crate::models::ToDbValue],
228 ) -> DatabaseResult<crate::models::DbValue> {
229 self.read().fetch_scalar_value(query, params).await
230 }
231
232 async fn begin_transaction(
233 &self,
234 ) -> DatabaseResult<Box<dyn crate::models::DatabaseTransaction>> {
235 self.write().begin_transaction().await
236 }
237
238 async fn get_database_info(&self) -> DatabaseResult<DatabaseInfo> {
239 self.read().get_database_info().await
240 }
241
242 async fn test_connection(&self) -> DatabaseResult<()> {
243 self.read().test_connection().await
244 }
245
246 async fn execute_batch(&self, sql: &str) -> DatabaseResult<()> {
247 self.write().execute_batch(sql).await
248 }
249
250 async fn query_raw(
251 &self,
252 query: &dyn crate::models::QuerySelector,
253 ) -> DatabaseResult<QueryResult> {
254 self.read().query_raw(query).await
255 }
256
257 async fn query_raw_with(
258 &self,
259 query: &dyn crate::models::QuerySelector,
260 params: &[&dyn crate::models::ToDbValue],
261 ) -> DatabaseResult<QueryResult> {
262 self.read().query_raw_with(query, params).await
263 }
264}