1#![deny(missing_docs)]
2
3use nidus_core::NidusError;
9use thiserror::Error;
10
11pub type Result<T> = std::result::Result<T, SqlxError>;
13
14#[derive(Debug, Error)]
16pub enum SqlxError {
17 #[error(transparent)]
19 Sqlx(#[from] sqlx::Error),
20
21 #[error(transparent)]
23 Nidus(#[from] NidusError),
24
25 #[cfg(feature = "nidus-config")]
27 #[error(transparent)]
28 Config(#[from] nidus_config::ConfigError),
29}
30
31#[cfg(feature = "sqlite")]
32mod sqlite {
33 #[cfg(feature = "observability")]
34 use std::time::Instant;
35
36 use super::Result;
37 use nidus_core::Container;
38
39 #[derive(Clone, Debug, Eq, PartialEq)]
41 pub struct SqlitePoolConfig {
42 database_url: String,
43 max_connections: Option<u32>,
44 }
45
46 impl SqlitePoolConfig {
47 pub fn new(database_url: impl Into<String>) -> Self {
49 Self {
50 database_url: database_url.into(),
51 max_connections: None,
52 }
53 }
54
55 pub fn with_max_connections(mut self, max_connections: u32) -> Self {
57 self.max_connections = Some(max_connections);
58 self
59 }
60
61 pub fn database_url(&self) -> &str {
63 &self.database_url
64 }
65
66 pub fn max_connections(&self) -> Option<u32> {
68 self.max_connections
69 }
70
71 #[cfg(feature = "nidus-config")]
73 pub fn from_config_path<I, S>(config: &nidus_config::Config, path: I) -> Result<Self>
74 where
75 I: IntoIterator<Item = S>,
76 S: AsRef<str>,
77 {
78 #[derive(serde::Deserialize)]
79 struct RawConfig {
80 url: String,
81 max_connections: Option<u32>,
82 }
83
84 let raw: RawConfig = config.get_required_path_typed(path)?;
85 let mut settings = Self::new(raw.url);
86 if let Some(max_connections) = raw.max_connections {
87 settings = settings.with_max_connections(max_connections);
88 }
89 Ok(settings)
90 }
91 }
92
93 #[derive(Clone, Debug)]
95 pub struct SqlitePoolBuilder {
96 config: SqlitePoolConfig,
97 #[cfg(feature = "observability")]
98 observer: Option<nidus_observability::ObservabilityAdapterObserver>,
99 }
100
101 impl SqlitePoolBuilder {
102 pub fn new() -> Self {
104 Self {
105 config: SqlitePoolConfig::new("sqlite::memory:"),
106 #[cfg(feature = "observability")]
107 observer: None,
108 }
109 }
110
111 pub fn config(mut self, config: SqlitePoolConfig) -> Self {
113 self.config = config;
114 self
115 }
116
117 pub fn database_url(mut self, database_url: impl Into<String>) -> Self {
119 self.config.database_url = database_url.into();
120 self
121 }
122
123 pub fn max_connections(mut self, max_connections: u32) -> Self {
125 self.config.max_connections = Some(max_connections);
126 self
127 }
128
129 #[cfg(feature = "observability")]
131 pub fn observability(
132 mut self,
133 observer: nidus_observability::ObservabilityAdapterObserver,
134 ) -> Self {
135 self.observer = Some(observer);
136 self
137 }
138
139 pub async fn connect(self) -> Result<SqlitePoolProvider> {
141 #[cfg(feature = "observability")]
142 let observer = self.observer;
143 let mut options = sqlx::sqlite::SqlitePoolOptions::new();
144 if let Some(max_connections) = self.config.max_connections {
145 options = options.max_connections(max_connections);
146 }
147 #[cfg(feature = "observability")]
148 let started_at = Instant::now();
149 let pool = options.connect(&self.config.database_url).await;
150 #[cfg(feature = "observability")]
151 record_adapter_operation(
152 &observer,
153 "connect",
154 nidus_observability::OperationStatus::from(pool.is_ok()),
155 started_at,
156 );
157 let pool = pool?;
158 Ok(SqlitePoolProvider {
159 pool,
160 #[cfg(feature = "observability")]
161 observer,
162 })
163 }
164
165 pub async fn register(self, container: &mut Container) -> Result<()> {
167 let provider = self.connect().await?;
168 container.register_singleton(provider)?;
169 Ok(())
170 }
171 }
172
173 impl Default for SqlitePoolBuilder {
174 fn default() -> Self {
175 Self::new()
176 }
177 }
178
179 #[derive(Clone, Debug)]
181 pub struct SqlitePoolProvider {
182 pool: sqlx::SqlitePool,
183 #[cfg(feature = "observability")]
184 observer: Option<nidus_observability::ObservabilityAdapterObserver>,
185 }
186
187 impl SqlitePoolProvider {
188 pub fn builder() -> SqlitePoolBuilder {
190 SqlitePoolBuilder::new()
191 }
192
193 pub fn from_pool(pool: sqlx::SqlitePool) -> Self {
195 Self {
196 pool,
197 #[cfg(feature = "observability")]
198 observer: None,
199 }
200 }
201
202 pub fn pool(&self) -> &sqlx::SqlitePool {
204 &self.pool
205 }
206
207 pub fn into_pool(self) -> sqlx::SqlitePool {
209 self.pool
210 }
211
212 #[cfg(feature = "health")]
214 pub async fn health_status(&self) -> nidus_http::health::HealthStatus {
215 #[cfg(feature = "observability")]
216 let started_at = Instant::now();
217 let result = sqlx::query("SELECT 1").execute(&self.pool).await;
218 #[cfg(feature = "observability")]
219 record_adapter_operation(
220 &self.observer,
221 "health",
222 nidus_observability::OperationStatus::from(result.is_ok()),
223 started_at,
224 );
225 match result {
226 Ok(_) => nidus_http::health::HealthStatus::up(),
227 Err(error) => nidus_http::health::HealthStatus::down(error.to_string()),
228 }
229 }
230
231 #[cfg(feature = "health")]
237 pub fn register_ready_check(
238 self: std::sync::Arc<Self>,
239 registry: nidus_http::health::HealthRegistry,
240 name: impl Into<String>,
241 ) -> nidus_http::health::HealthRegistry {
242 registry.ready_check(name, move || {
243 let provider = std::sync::Arc::clone(&self);
244 async move { provider.health_status().await }
245 })
246 }
247 }
248
249 #[cfg(feature = "observability")]
250 fn record_adapter_operation(
251 observer: &Option<nidus_observability::ObservabilityAdapterObserver>,
252 operation: &'static str,
253 status: nidus_observability::OperationStatus,
254 started_at: Instant,
255 ) {
256 if let Some(observer) = observer {
257 observer.record("nidus-sqlx", operation, status, started_at.elapsed());
258 }
259 }
260}
261
262#[cfg(feature = "sqlite")]
263pub use sqlite::{SqlitePoolBuilder, SqlitePoolConfig, SqlitePoolProvider};
264
265#[cfg(feature = "postgres")]
266mod postgres {
267 #[cfg(feature = "observability")]
268 use std::time::Instant;
269
270 use super::Result;
271 use nidus_core::Container;
272
273 #[derive(Clone, Debug, Eq, PartialEq)]
275 pub struct PostgresPoolConfig {
276 database_url: String,
277 max_connections: Option<u32>,
278 min_connections: Option<u32>,
279 }
280
281 impl PostgresPoolConfig {
282 pub fn new(database_url: impl Into<String>) -> Self {
284 Self {
285 database_url: database_url.into(),
286 max_connections: None,
287 min_connections: None,
288 }
289 }
290
291 pub fn with_max_connections(mut self, max_connections: u32) -> Self {
293 self.max_connections = Some(max_connections);
294 self
295 }
296
297 pub fn with_min_connections(mut self, min_connections: u32) -> Self {
299 self.min_connections = Some(min_connections);
300 self
301 }
302
303 pub fn database_url(&self) -> &str {
305 &self.database_url
306 }
307
308 pub fn max_connections(&self) -> Option<u32> {
310 self.max_connections
311 }
312
313 pub fn min_connections(&self) -> Option<u32> {
315 self.min_connections
316 }
317
318 #[cfg(feature = "nidus-config")]
320 pub fn from_config_path<I, S>(config: &nidus_config::Config, path: I) -> Result<Self>
321 where
322 I: IntoIterator<Item = S>,
323 S: AsRef<str>,
324 {
325 #[derive(serde::Deserialize)]
326 struct RawConfig {
327 url: String,
328 max_connections: Option<u32>,
329 min_connections: Option<u32>,
330 }
331
332 let raw: RawConfig = config.get_required_path_typed(path)?;
333 let mut settings = Self::new(raw.url);
334 if let Some(max_connections) = raw.max_connections {
335 settings = settings.with_max_connections(max_connections);
336 }
337 if let Some(min_connections) = raw.min_connections {
338 settings = settings.with_min_connections(min_connections);
339 }
340 Ok(settings)
341 }
342 }
343
344 #[derive(Clone, Debug)]
346 pub struct PostgresPoolBuilder {
347 config: PostgresPoolConfig,
348 #[cfg(feature = "observability")]
349 observer: Option<nidus_observability::ObservabilityAdapterObserver>,
350 }
351
352 impl PostgresPoolBuilder {
353 pub fn new(database_url: impl Into<String>) -> Self {
355 Self {
356 config: PostgresPoolConfig::new(database_url),
357 #[cfg(feature = "observability")]
358 observer: None,
359 }
360 }
361
362 pub fn config(mut self, config: PostgresPoolConfig) -> Self {
364 self.config = config;
365 self
366 }
367
368 pub fn database_url(mut self, database_url: impl Into<String>) -> Self {
370 self.config.database_url = database_url.into();
371 self
372 }
373
374 pub fn max_connections(mut self, max_connections: u32) -> Self {
376 self.config.max_connections = Some(max_connections);
377 self
378 }
379
380 pub fn min_connections(mut self, min_connections: u32) -> Self {
382 self.config.min_connections = Some(min_connections);
383 self
384 }
385
386 #[cfg(feature = "observability")]
388 pub fn observability(
389 mut self,
390 observer: nidus_observability::ObservabilityAdapterObserver,
391 ) -> Self {
392 self.observer = Some(observer);
393 self
394 }
395
396 pub async fn connect(self) -> Result<PostgresPoolProvider> {
398 #[cfg(feature = "observability")]
399 let observer = self.observer;
400 let mut options = sqlx::postgres::PgPoolOptions::new();
401 if let Some(max_connections) = self.config.max_connections {
402 options = options.max_connections(max_connections);
403 }
404 if let Some(min_connections) = self.config.min_connections {
405 options = options.min_connections(min_connections);
406 }
407 #[cfg(feature = "observability")]
408 let started_at = Instant::now();
409 let pool = options.connect(&self.config.database_url).await;
410 #[cfg(feature = "observability")]
411 record_adapter_operation(
412 &observer,
413 "connect",
414 nidus_observability::OperationStatus::from(pool.is_ok()),
415 started_at,
416 );
417 let pool = pool?;
418 Ok(PostgresPoolProvider {
419 pool,
420 #[cfg(feature = "observability")]
421 observer,
422 })
423 }
424
425 pub async fn register(self, container: &mut Container) -> Result<()> {
427 let provider = self.connect().await?;
428 container.register_singleton(provider)?;
429 Ok(())
430 }
431 }
432
433 #[derive(Clone, Debug)]
435 pub struct PostgresPoolProvider {
436 pool: sqlx::PgPool,
437 #[cfg(feature = "observability")]
438 observer: Option<nidus_observability::ObservabilityAdapterObserver>,
439 }
440
441 impl PostgresPoolProvider {
442 pub fn builder(database_url: impl Into<String>) -> PostgresPoolBuilder {
444 PostgresPoolBuilder::new(database_url)
445 }
446
447 pub fn from_pool(pool: sqlx::PgPool) -> Self {
449 Self {
450 pool,
451 #[cfg(feature = "observability")]
452 observer: None,
453 }
454 }
455
456 pub fn pool(&self) -> &sqlx::PgPool {
458 &self.pool
459 }
460
461 pub fn into_pool(self) -> sqlx::PgPool {
463 self.pool
464 }
465
466 #[cfg(feature = "health")]
468 pub async fn health_status(&self) -> nidus_http::health::HealthStatus {
469 #[cfg(feature = "observability")]
470 let started_at = Instant::now();
471 let result = sqlx::query("SELECT 1").execute(&self.pool).await;
472 #[cfg(feature = "observability")]
473 record_adapter_operation(
474 &self.observer,
475 "health",
476 nidus_observability::OperationStatus::from(result.is_ok()),
477 started_at,
478 );
479 match result {
480 Ok(_) => nidus_http::health::HealthStatus::up(),
481 Err(error) => nidus_http::health::HealthStatus::down(error.to_string()),
482 }
483 }
484
485 #[cfg(feature = "health")]
491 pub fn register_ready_check(
492 self: std::sync::Arc<Self>,
493 registry: nidus_http::health::HealthRegistry,
494 name: impl Into<String>,
495 ) -> nidus_http::health::HealthRegistry {
496 registry.ready_check(name, move || {
497 let provider = std::sync::Arc::clone(&self);
498 async move { provider.health_status().await }
499 })
500 }
501 }
502
503 #[cfg(feature = "observability")]
504 fn record_adapter_operation(
505 observer: &Option<nidus_observability::ObservabilityAdapterObserver>,
506 operation: &'static str,
507 status: nidus_observability::OperationStatus,
508 started_at: Instant,
509 ) {
510 if let Some(observer) = observer {
511 observer.record("nidus-sqlx", operation, status, started_at.elapsed());
512 }
513 }
514}
515
516#[cfg(feature = "postgres")]
517pub use postgres::{PostgresPoolBuilder, PostgresPoolConfig, PostgresPoolProvider};