systemprompt_database/services/
database.rs1use super::postgres::PostgresProvider;
6use super::provider::DatabaseProvider;
7use crate::error::{DatabaseResult, RepositoryError};
8use crate::models::{DatabaseInfo, QueryResult};
9use std::sync::Arc;
10
11pub struct Database {
12 provider: Arc<dyn DatabaseProvider>,
13 write_provider: Option<Arc<dyn DatabaseProvider>>,
14}
15
16impl std::fmt::Debug for Database {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.debug_struct("Database")
19 .field("backend", &"PostgreSQL")
20 .finish()
21 }
22}
23
24impl Database {
25 pub async fn new_postgres(url: &str) -> DatabaseResult<Self> {
26 let provider = PostgresProvider::new(url).await?;
27 Ok(Self {
28 provider: Arc::new(provider),
29 write_provider: None,
30 })
31 }
32
33 pub async fn from_config(db_type: &str, url: &str) -> DatabaseResult<Self> {
34 match db_type.to_lowercase().as_str() {
35 "postgres" | "postgresql" | "" => Self::new_postgres(url).await,
36 other => Err(RepositoryError::invalid_argument(format!(
37 "Unsupported database type: {other}. Only PostgreSQL is supported."
38 ))),
39 }
40 }
41
42 pub async fn from_config_with_write(
43 db_type: &str,
44 read_url: &str,
45 write_url: Option<&str>,
46 ) -> DatabaseResult<Self> {
47 let provider: Arc<dyn DatabaseProvider> = match db_type.to_lowercase().as_str() {
48 "postgres" | "postgresql" | "" => Arc::new(PostgresProvider::new(read_url).await?),
49 other => {
50 return Err(RepositoryError::invalid_argument(format!(
51 "Unsupported database type: {other}. Only PostgreSQL is supported."
52 )));
53 },
54 };
55
56 let write_provider: Option<Arc<dyn DatabaseProvider>> = match write_url {
57 Some(url) => Some(Arc::new(PostgresProvider::new(url).await?)),
58 None => None,
59 };
60
61 Ok(Self {
62 provider,
63 write_provider,
64 })
65 }
66
67 #[must_use]
68 pub fn from_pools(read: Arc<sqlx::PgPool>, write: Option<Arc<sqlx::PgPool>>) -> Self {
69 let write_provider = write.map(|pool| -> Arc<dyn DatabaseProvider> {
70 Arc::new(PostgresProvider::from_pool(pool))
71 });
72 Self {
73 provider: Arc::new(PostgresProvider::from_pool(read)),
74 write_provider,
75 }
76 }
77
78 fn require_postgres(pool: Option<Arc<sqlx::PgPool>>) -> DatabaseResult<Arc<sqlx::PgPool>> {
79 pool.ok_or_else(|| RepositoryError::invalid_state("Database is not PostgreSQL"))
80 }
81
82 #[must_use]
83 pub fn read(&self) -> &dyn DatabaseProvider {
84 self.provider.as_ref()
85 }
86
87 #[must_use]
88 pub fn write(&self) -> &dyn DatabaseProvider {
89 self.write_provider
90 .as_deref()
91 .unwrap_or_else(|| self.provider.as_ref())
92 }
93
94 #[must_use]
95 pub fn pool(&self) -> Option<Arc<sqlx::PgPool>> {
96 self.read().get_postgres_pool()
97 }
98
99 pub fn pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
100 Self::require_postgres(self.read().get_postgres_pool())
101 }
102
103 #[must_use]
104 pub fn write_pool(&self) -> Option<Arc<sqlx::PgPool>> {
105 self.write().get_postgres_pool()
106 }
107
108 pub fn write_pool_arc(&self) -> DatabaseResult<Arc<sqlx::PgPool>> {
109 Self::require_postgres(self.write().get_postgres_pool())
110 }
111
112 #[must_use]
113 pub fn has_write_pool(&self) -> bool {
114 self.write_provider.is_some()
115 }
116
117 pub async fn execute_batch(&self, sql: &str) -> DatabaseResult<()> {
118 self.write().execute_batch(sql).await
119 }
120
121 pub async fn get_info(&self) -> DatabaseResult<DatabaseInfo> {
122 self.read().get_database_info().await
123 }
124
125 pub async fn test_connection(&self) -> DatabaseResult<()> {
126 self.provider.test_connection().await?;
127 if let Some(wp) = &self.write_provider {
128 wp.test_connection().await?;
129 }
130 Ok(())
131 }
132
133 pub async fn begin(&self) -> DatabaseResult<sqlx::Transaction<'_, sqlx::Postgres>> {
134 let pool = self.write_pool_arc()?;
135 pool.begin().await.map_err(Into::into)
136 }
137}
138
139pub type DbPool = Arc<Database>;
140
141pub trait DatabaseExt {
142 fn database(&self) -> Arc<Database>;
143}
144
145impl DatabaseExt for Arc<Database> {
146 fn database(&self) -> Arc<Database> {
147 Self::clone(self)
148 }
149}
150
151#[async_trait::async_trait]
152impl DatabaseProvider for Database {
153 fn get_postgres_pool(&self) -> Option<Arc<sqlx::PgPool>> {
154 self.read().get_postgres_pool()
155 }
156
157 async fn execute(
158 &self,
159 query: &dyn crate::models::QuerySelector,
160 params: &[&dyn crate::models::ToDbValue],
161 ) -> DatabaseResult<u64> {
162 self.write().execute(query, params).await
163 }
164
165 async fn execute_raw(&self, sql: &str) -> DatabaseResult<()> {
166 self.write().execute_raw(sql).await
167 }
168
169 async fn fetch_all(
170 &self,
171 query: &dyn crate::models::QuerySelector,
172 params: &[&dyn crate::models::ToDbValue],
173 ) -> DatabaseResult<Vec<crate::models::JsonRow>> {
174 self.read().fetch_all(query, params).await
175 }
176
177 async fn fetch_one(
178 &self,
179 query: &dyn crate::models::QuerySelector,
180 params: &[&dyn crate::models::ToDbValue],
181 ) -> DatabaseResult<crate::models::JsonRow> {
182 self.read().fetch_one(query, params).await
183 }
184
185 async fn fetch_optional(
186 &self,
187 query: &dyn crate::models::QuerySelector,
188 params: &[&dyn crate::models::ToDbValue],
189 ) -> DatabaseResult<Option<crate::models::JsonRow>> {
190 self.read().fetch_optional(query, params).await
191 }
192
193 async fn fetch_scalar_value(
194 &self,
195 query: &dyn crate::models::QuerySelector,
196 params: &[&dyn crate::models::ToDbValue],
197 ) -> DatabaseResult<crate::models::DbValue> {
198 self.read().fetch_scalar_value(query, params).await
199 }
200
201 async fn begin_transaction(
202 &self,
203 ) -> DatabaseResult<Box<dyn crate::models::DatabaseTransaction>> {
204 self.write().begin_transaction().await
205 }
206
207 async fn get_database_info(&self) -> DatabaseResult<DatabaseInfo> {
208 self.read().get_database_info().await
209 }
210
211 async fn test_connection(&self) -> DatabaseResult<()> {
212 self.read().test_connection().await
213 }
214
215 async fn execute_batch(&self, sql: &str) -> DatabaseResult<()> {
216 self.write().execute_batch(sql).await
217 }
218
219 async fn query_raw(
220 &self,
221 query: &dyn crate::models::QuerySelector,
222 ) -> DatabaseResult<QueryResult> {
223 self.read().query_raw(query).await
224 }
225
226 async fn query_raw_with(
227 &self,
228 query: &dyn crate::models::QuerySelector,
229 params: &[&dyn crate::models::ToDbValue],
230 ) -> DatabaseResult<QueryResult> {
231 self.read().query_raw_with(query, params).await
232 }
233}