1#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![allow(clippy::struct_excessive_bools)]
10#![allow(clippy::unnecessary_wraps)]
11#![allow(clippy::unnecessary_cast)]
12
13pub mod circuit_breaker;
14mod event_store;
15pub mod monitoring;
16pub mod retry;
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23
24use chrono::{DateTime, Utc};
25use eventcore::serialization::SerializationFormat;
26use eventcore::{EventId, EventStoreError, ReadOptions, StoredEvent, StreamId};
27use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
28use thiserror::Error;
29use tracing::{debug, info, instrument};
30
31pub use circuit_breaker::{
32 CircuitBreaker, CircuitBreakerConfig, CircuitBreakerError, CircuitBreakerMetrics, CircuitState,
33};
34pub use monitoring::{AcquisitionTimer, PoolMetrics, PoolMonitor, PoolMonitoringTask};
35pub use retry::{RetryError, RetryStrategy};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct HealthStatus {
40 pub is_healthy: bool,
42 pub basic_latency: Duration,
44 pub pool_status: PoolStatus,
46 pub schema_status: SchemaStatus,
48 pub performance_status: PerformanceStatus,
50 pub last_check: DateTime<Utc>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct PoolStatus {
57 pub size: u32,
59 pub idle: u32,
61 pub is_closed: bool,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SchemaStatus {
68 pub has_events_table: bool,
70 pub has_streams_table: bool,
72 pub has_subscriptions_table: bool,
74 pub is_complete: bool,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct PerformanceStatus {
81 pub query_latency: Duration,
83 pub is_performant: bool,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PostgresConfig {
90 pub database_url: String,
92
93 pub max_connections: u32,
95
96 pub min_connections: u32,
98
99 pub connect_timeout: Duration,
101
102 pub query_timeout: Option<Duration>,
105
106 pub max_lifetime: Option<Duration>,
108
109 pub idle_timeout: Option<Duration>,
111
112 pub test_before_acquire: bool,
114
115 pub max_retries: u32,
117
118 pub retry_base_delay: Duration,
120
121 pub retry_max_delay: Duration,
123
124 pub enable_recovery: bool,
126
127 pub health_check_interval: Duration,
129
130 pub read_batch_size: usize,
133
134 pub serialization_format: SerializationFormat,
137}
138
139impl PostgresConfig {
140 pub fn new(database_url: impl Into<String>) -> Self {
142 Self {
143 database_url: database_url.into(),
144 ..Self::default()
145 }
146 }
147}
148
149impl Default for PostgresConfig {
150 fn default() -> Self {
151 Self {
152 database_url: "postgres://postgres:postgres@localhost/eventcore".to_string(),
153 max_connections: 20, min_connections: 2, connect_timeout: Duration::from_secs(10), query_timeout: Some(Duration::from_secs(30)), max_lifetime: Some(Duration::from_secs(1800)), idle_timeout: Some(Duration::from_secs(600)), test_before_acquire: false, max_retries: 3, retry_base_delay: Duration::from_millis(100), retry_max_delay: Duration::from_secs(5), enable_recovery: true, health_check_interval: Duration::from_secs(30), read_batch_size: 1000, serialization_format: SerializationFormat::default(), }
168 }
169}
170
171#[derive(Debug, Error)]
173pub enum PostgresError {
174 #[error("Database connection error: {0}")]
176 Connection(#[from] sqlx::Error),
177
178 #[error("Failed to create connection pool: {0}")]
180 PoolCreation(String),
181
182 #[error("Database migration error: {0}")]
184 Migration(String),
185
186 #[error("JSON serialization error: {0}")]
188 Serialization(#[from] serde_json::Error),
189
190 #[error("Transaction error: {0}")]
192 Transaction(String),
193}
194
195#[allow(clippy::fallible_impl_from)] impl From<PostgresError> for EventStoreError {
197 fn from(error: PostgresError) -> Self {
198 match error {
199 PostgresError::Connection(sqlx_error) => {
200 use sqlx::Error::{
202 Configuration, Database, Io, PoolClosed, PoolTimedOut, Protocol, RowNotFound,
203 Tls,
204 };
205 match &sqlx_error {
206 Configuration(_) => Self::Configuration(sqlx_error.to_string()),
207 Database(db_err) => {
208 if let Some(code) = db_err.code() {
210 match code.as_ref() {
211 "23505" => {
212 return Self::ConnectionFailed(format!(
214 "Unique constraint violation: {db_err}"
215 ));
216 }
217 "40001" => {
218 use eventcore::{EventVersion, StreamId};
222 return Self::VersionConflict {
223 stream: StreamId::try_new("serialization-conflict")
224 .unwrap(),
225 expected: EventVersion::initial(),
226 current: EventVersion::try_new(1).unwrap(),
227 };
228 }
229 _ => {}
230 }
231 }
232
233 let error_msg = db_err.to_string().to_lowercase();
235 if error_msg.contains("could not serialize access due to concurrent update")
236 || error_msg.contains("serialization failure")
237 {
238 use eventcore::{EventVersion, StreamId};
239 return Self::VersionConflict {
240 stream: StreamId::try_new("serialization-conflict").unwrap(),
241 expected: EventVersion::initial(),
242 current: EventVersion::try_new(1).unwrap(),
243 };
244 }
245
246 Self::ConnectionFailed(db_err.to_string())
247 }
248 Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed => {
249 Self::ConnectionFailed(sqlx_error.to_string())
250 }
251 RowNotFound => Self::ConnectionFailed(format!("Row not found: {sqlx_error}")),
252 _ => Self::Internal(sqlx_error.to_string()),
253 }
254 }
255 PostgresError::PoolCreation(msg) => Self::ConnectionFailed(msg),
256 PostgresError::Migration(msg) => Self::Configuration(msg),
257 PostgresError::Serialization(err) => Self::SerializationFailed(err.to_string()),
258 PostgresError::Transaction(msg) => Self::TransactionRollback(msg),
259 }
260 }
261}
262
263pub struct PostgresEventStore<E>
270where
271 E: Send + Sync,
272{
273 pool: Arc<PgPool>,
274 config: PostgresConfig,
275 retry_strategy: RetryStrategy,
276 monitor: Arc<monitoring::PoolMonitor>,
277 _phantom: PhantomData<E>,
279}
280
281impl<E> Clone for PostgresEventStore<E>
282where
283 E: Serialize
284 + for<'de> Deserialize<'de>
285 + Send
286 + Sync
287 + std::fmt::Debug
288 + Clone
289 + PartialEq
290 + Eq
291 + 'static,
292{
293 fn clone(&self) -> Self {
294 Self {
295 pool: Arc::clone(&self.pool),
296 config: self.config.clone(),
297 retry_strategy: self.retry_strategy.clone(),
298 monitor: Arc::clone(&self.monitor),
299 _phantom: PhantomData,
300 }
301 }
302}
303
304impl<E> PostgresEventStore<E>
305where
306 E: Serialize
307 + for<'de> Deserialize<'de>
308 + Send
309 + Sync
310 + std::fmt::Debug
311 + Clone
312 + PartialEq
313 + Eq
314 + 'static,
315{
316 pub async fn new(config: PostgresConfig) -> Result<Self, PostgresError> {
318 let pool = Self::create_pool(&config).await?;
319
320 let retry_strategy = RetryStrategy {
321 max_attempts: config.max_retries,
322 base_delay: config.retry_base_delay,
323 max_delay: config.retry_max_delay,
324 ..RetryStrategy::default()
325 };
326
327 let monitor = Arc::new(monitoring::PoolMonitor::new(config.max_connections));
328
329 Ok(Self {
330 pool: Arc::new(pool),
331 config,
332 retry_strategy,
333 monitor,
334 _phantom: PhantomData,
335 })
336 }
337
338 pub async fn new_with_retry_strategy(
340 config: PostgresConfig,
341 retry_strategy: RetryStrategy,
342 ) -> Result<Self, PostgresError> {
343 let pool = Self::create_pool(&config).await?;
344 let monitor = Arc::new(monitoring::PoolMonitor::new(config.max_connections));
345
346 Ok(Self {
347 pool: Arc::new(pool),
348 config,
349 retry_strategy,
350 monitor,
351 _phantom: PhantomData,
352 })
353 }
354
355 async fn create_pool(config: &PostgresConfig) -> Result<PgPool, PostgresError> {
357 let mut connect_options: PgConnectOptions = config
358 .database_url
359 .parse()
360 .map_err(|e| PostgresError::PoolCreation(format!("Invalid database URL: {e}")))?;
361
362 if let Some(query_timeout) = config.query_timeout {
364 #[allow(clippy::cast_possible_truncation)]
367 let timeout_ms = query_timeout.as_millis() as u64;
368 connect_options =
369 connect_options.options([("statement_timeout", &timeout_ms.to_string())]);
370 }
371
372 let mut pool_options = PgPoolOptions::new()
373 .max_connections(config.max_connections)
374 .min_connections(config.min_connections)
375 .acquire_timeout(config.connect_timeout)
376 .test_before_acquire(config.test_before_acquire);
377
378 if let Some(max_lifetime) = config.max_lifetime {
379 pool_options = pool_options.max_lifetime(max_lifetime);
380 }
381
382 if let Some(idle_timeout) = config.idle_timeout {
383 pool_options = pool_options.idle_timeout(idle_timeout);
384 }
385
386 pool_options
387 .connect_with(connect_options)
388 .await
389 .map_err(|e| PostgresError::PoolCreation(format!("Failed to create pool: {e}")))
390 }
391
392 pub fn pool(&self) -> &PgPool {
394 &self.pool
395 }
396
397 pub const fn config(&self) -> &PostgresConfig {
399 &self.config
400 }
401
402 #[allow(clippy::unused_async)]
404 pub async fn migrate(&self) -> Result<(), PostgresError> {
405 info!("Database migrations will be implemented in Phase 8.3");
407 Ok(())
408 }
409
410 async fn schema_exists(&self) -> Result<bool, PostgresError> {
412 let count = sqlx::query_scalar::<_, i64>(
413 r"
414 SELECT COUNT(*) FROM information_schema.tables
415 WHERE table_name IN ('events', 'event_streams')
416 AND table_schema = 'public'
417 ",
418 )
419 .fetch_one(self.pool.as_ref())
420 .await
421 .map_err(PostgresError::Connection)?;
422
423 Ok(count >= 2)
424 }
425
426 async fn try_acquire_schema_lock(&self, lock_id: i64) -> Result<bool, PostgresError> {
428 sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)")
429 .bind(lock_id)
430 .fetch_one(self.pool.as_ref())
431 .await
432 .map_err(PostgresError::Connection)
433 }
434
435 async fn wait_for_schema_initialization(&self, lock_id: i64) -> Result<bool, PostgresError> {
437 for _ in 0..50 {
439 tokio::time::sleep(Duration::from_millis(100)).await;
441
442 if self.try_acquire_schema_lock(lock_id).await? {
443 return Ok(true);
445 }
446
447 if self.schema_exists().await? {
449 debug!("Database schema initialized by another process while waiting");
450 return Ok(false);
451 }
452 }
453
454 if self.try_acquire_schema_lock(lock_id).await? {
456 Ok(true)
457 } else if self.schema_exists().await? {
458 debug!("Database schema initialized by another process");
459 Ok(false)
460 } else {
461 Err(PostgresError::Migration(
462 "Failed to acquire schema initialization lock after multiple attempts".to_string(),
463 ))
464 }
465 }
466
467 #[allow(clippy::too_many_lines)]
473 pub async fn initialize(&self) -> Result<(), PostgresError> {
474 const SCHEMA_LOCK_ID: i64 = 123_456_789;
477
478 let mut lock_acquired = self.try_acquire_schema_lock(SCHEMA_LOCK_ID).await?;
480
481 if !lock_acquired {
482 tokio::time::sleep(Duration::from_millis(100)).await;
484
485 if self.schema_exists().await? {
487 debug!("Database schema already initialized by another process");
488 return Ok(());
489 }
490
491 lock_acquired = self.wait_for_schema_initialization(SCHEMA_LOCK_ID).await?;
493 if !lock_acquired {
494 return Ok(());
496 }
497 }
498
499 let result = async {
501 sqlx::query("CREATE EXTENSION IF NOT EXISTS pgcrypto")
503 .execute(self.pool.as_ref())
504 .await
505 .map_err(PostgresError::Connection)?;
506
507 sqlx::query(
509 r"
510 CREATE OR REPLACE FUNCTION gen_uuidv7() RETURNS UUID AS $$
511 DECLARE
512 unix_ts_ms BIGINT;
513 uuid_bytes BYTEA;
514 BEGIN
515 -- Get current timestamp in milliseconds since Unix epoch
516 unix_ts_ms := (extract(epoch from clock_timestamp()) * 1000)::BIGINT;
517
518 -- Create a 16-byte UUID:
519 -- Bytes 0-5: 48-bit big-endian timestamp (milliseconds since Unix epoch)
520 -- Bytes 6-7: 4-bit version (0111) + 12 random bits
521 -- Bytes 8-9: 2-bit variant (10) + 14 random bits
522 -- Bytes 10-15: 48 random bits
523
524 -- Build the UUID byte by byte
525 uuid_bytes :=
526 -- Timestamp: 48 bits, big-endian
527 substring(int8send(unix_ts_ms) from 3 for 6) ||
528 -- Version (0111 = 7) + random: 16 bits
529 -- First byte: 0111RRRR (where R is random)
530 set_byte(gen_random_bytes(1), 0, (get_byte(gen_random_bytes(1), 0) & 15) | 112) ||
531 -- Second byte: all random
532 gen_random_bytes(1) ||
533 -- Variant (10) + random: 16 bits
534 -- First byte: 10RRRRRR (where R is random)
535 set_byte(gen_random_bytes(1), 0, (get_byte(gen_random_bytes(1), 0) & 63) | 128) ||
536 -- Remaining 7 bytes: all random
537 gen_random_bytes(7);
538
539 RETURN encode(uuid_bytes, 'hex')::UUID;
540 END;
541 $$ LANGUAGE plpgsql VOLATILE;
542 ",
543 )
544 .execute(self.pool.as_ref())
545 .await
546 .map_err(PostgresError::Connection)?;
547
548 sqlx::query(
550 r"
551 CREATE TABLE IF NOT EXISTS events (
552 event_id UUID PRIMARY KEY DEFAULT gen_uuidv7(),
553 stream_id VARCHAR(255) NOT NULL,
554 event_version BIGINT NOT NULL,
555 event_type VARCHAR(255) NOT NULL,
556 event_data JSONB NOT NULL,
557 metadata JSONB,
558 causation_id UUID,
559 correlation_id VARCHAR(255),
560 user_id VARCHAR(255),
561 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
562 UNIQUE(stream_id, event_version)
563 )
564 ",
565 )
566 .execute(self.pool.as_ref())
567 .await
568 .map_err(PostgresError::Connection)?;
569
570 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id)")
572 .execute(self.pool.as_ref())
573 .await
574 .map_err(PostgresError::Connection)?;
575
576 sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at)")
577 .execute(self.pool.as_ref())
578 .await
579 .map_err(PostgresError::Connection)?;
580
581 sqlx::query(
582 "CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON events(correlation_id)",
583 )
584 .execute(self.pool.as_ref())
585 .await
586 .map_err(PostgresError::Connection)?;
587
588 sqlx::query(
591 "CREATE INDEX IF NOT EXISTS idx_events_multistream_any ON events(stream_id, event_id)",
592 )
593 .execute(self.pool.as_ref())
594 .await
595 .map_err(PostgresError::Connection)?;
596
597 sqlx::query("ALTER TABLE events ALTER COLUMN event_id SET DEFAULT gen_uuidv7()")
599 .execute(self.pool.as_ref())
600 .await
601 .map_err(PostgresError::Connection)?;
602
603 sqlx::query(
605 r"
606 CREATE OR REPLACE FUNCTION check_basic_event_constraints() RETURNS TRIGGER AS $$
607 DECLARE
608 current_max_version BIGINT;
609 BEGIN
610 -- Lock the stream for this transaction to ensure sequential versioning
611 -- This prevents gaps when multiple events are inserted in parallel
612 PERFORM pg_advisory_xact_lock(hashtext(NEW.stream_id));
613
614 -- For version 1, this is ExpectedVersion::New - stream MUST be empty
615 IF NEW.event_version = 1 THEN
616 SELECT COALESCE(MAX(event_version), 0) INTO current_max_version
617 FROM events
618 WHERE stream_id = NEW.stream_id;
619
620 IF current_max_version != 0 THEN
621 RAISE EXCEPTION 'Version conflict for stream %: cannot insert version 1 when stream already has events (current max: %)',
622 NEW.stream_id, current_max_version
623 USING ERRCODE = '40001';
624 END IF;
625 END IF;
626
627 -- Note: event_id is automatically generated by column default
628 -- Comprehensive gap detection is handled by statement-level trigger
629
630 RETURN NEW;
631 END;
632 $$ LANGUAGE plpgsql
633 ",
634 )
635 .execute(self.pool.as_ref())
636 .await
637 .map_err(PostgresError::Connection)?;
638
639 sqlx::query(
641 r"
642 CREATE OR REPLACE FUNCTION validate_batch_versions() RETURNS TRIGGER AS $$
643 DECLARE
644 stream_record RECORD;
645 previous_max_version BIGINT;
646 min_new_version BIGINT;
647 max_new_version BIGINT;
648 expected_count BIGINT;
649 actual_count BIGINT;
650 current_txid BIGINT;
651 BEGIN
652 -- Get current transaction ID
653 current_txid := txid_current();
654
655 -- Get all streams that had events inserted in this statement
656 -- We'll identify them by events whose xmin matches current transaction
657 FOR stream_record IN
658 SELECT DISTINCT stream_id
659 FROM events
660 WHERE xmin::text::bigint = current_txid
661 LOOP
662 -- Get the previous max version (before this transaction)
663 SELECT COALESCE(MAX(event_version), 0) INTO previous_max_version
664 FROM events
665 WHERE stream_id = stream_record.stream_id
666 AND xmin::text::bigint != current_txid;
667
668 -- Get the range of new versions for this stream in this transaction
669 SELECT MIN(event_version), MAX(event_version), COUNT(*)
670 INTO min_new_version, max_new_version, actual_count
671 FROM events
672 WHERE stream_id = stream_record.stream_id
673 AND xmin::text::bigint = current_txid;
674
675 -- Validate that new versions start immediately after previous max
676 IF min_new_version != previous_max_version + 1 THEN
677 RAISE EXCEPTION 'Version gap detected for stream %: expected first new version %, got %',
678 stream_record.stream_id, previous_max_version + 1, min_new_version
679 USING ERRCODE = '40001';
680 END IF;
681
682 -- Validate no gaps within the batch
683 expected_count := max_new_version - min_new_version + 1;
684 IF actual_count != expected_count THEN
685 RAISE EXCEPTION 'Version gaps detected within batch for stream %: expected % contiguous versions, got %',
686 stream_record.stream_id, expected_count, actual_count
687 USING ERRCODE = '40001';
688 END IF;
689 END LOOP;
690
691 RETURN NULL; -- Result is ignored for AFTER triggers
692 END;
693 $$ LANGUAGE plpgsql
694 ",
695 )
696 .execute(self.pool.as_ref())
697 .await
698 .map_err(PostgresError::Connection)?;
699
700 sqlx::query("DROP TRIGGER IF EXISTS enforce_event_version ON events")
702 .execute(self.pool.as_ref())
703 .await
704 .map_err(PostgresError::Connection)?;
705
706 sqlx::query("DROP TRIGGER IF EXISTS check_version_gaps_after_insert ON events")
707 .execute(self.pool.as_ref())
708 .await
709 .map_err(PostgresError::Connection)?;
710
711 sqlx::query("DROP TRIGGER IF EXISTS enforce_basic_event_constraints ON events")
712 .execute(self.pool.as_ref())
713 .await
714 .map_err(PostgresError::Connection)?;
715
716 sqlx::query("DROP TRIGGER IF EXISTS validate_batch_versions_after_insert ON events")
717 .execute(self.pool.as_ref())
718 .await
719 .map_err(PostgresError::Connection)?;
720
721 sqlx::query(
723 r"
724 CREATE TRIGGER enforce_basic_event_constraints
725 BEFORE INSERT ON events
726 FOR EACH ROW
727 EXECUTE FUNCTION check_basic_event_constraints()
728 ",
729 )
730 .execute(self.pool.as_ref())
731 .await
732 .map_err(PostgresError::Connection)?;
733
734 sqlx::query(
736 r"
737 CREATE TRIGGER validate_batch_versions_after_insert
738 AFTER INSERT ON events
739 FOR EACH STATEMENT
740 EXECUTE FUNCTION validate_batch_versions()
741 ",
742 )
743 .execute(self.pool.as_ref())
744 .await
745 .map_err(PostgresError::Connection)?;
746
747
748 sqlx::query("ALTER TABLE events DROP CONSTRAINT IF EXISTS fk_events_stream_id")
750 .execute(self.pool.as_ref())
751 .await
752 .map_err(PostgresError::Connection)?;
753
754 info!("Database schema initialized successfully");
755 Ok::<(), PostgresError>(())
756 }
757 .await;
758
759 let _unlock_result = sqlx::query("SELECT pg_advisory_unlock($1)")
761 .bind(SCHEMA_LOCK_ID)
762 .execute(self.pool.as_ref())
763 .await;
764
765 result
766 }
767
768 #[instrument(skip(self))]
770 pub async fn health_check(&self) -> Result<HealthStatus, PostgresError> {
771 let start = std::time::Instant::now();
772
773 sqlx::query("SELECT 1")
775 .execute(self.pool.as_ref())
776 .await
777 .map_err(PostgresError::Connection)?;
778
779 let basic_latency = start.elapsed();
780
781 let pool_status = self.get_pool_status()?;
783 let schema_status = self.verify_schema().await?;
784 let performance_status = self.check_performance().await?;
785
786 let status = HealthStatus {
787 is_healthy: true,
788 basic_latency,
789 pool_status,
790 schema_status,
791 performance_status,
792 last_check: chrono::Utc::now(),
793 };
794
795 debug!("PostgreSQL health check passed: {:?}", status);
796 Ok(status)
797 }
798
799 fn get_pool_status(&self) -> Result<PoolStatus, PostgresError> {
801 let pool = self.pool.as_ref();
802
803 Ok(PoolStatus {
804 #[allow(clippy::cast_possible_truncation)]
805 size: pool.size() as u32,
806 #[allow(clippy::cast_possible_truncation)]
807 idle: pool.num_idle() as u32,
808 is_closed: pool.is_closed(),
809 })
810 }
811
812 async fn verify_schema(&self) -> Result<SchemaStatus, PostgresError> {
814 let has_events_table = self.table_exists("events").await?;
815 let has_streams_table = false;
817 let has_subscriptions_table = self
818 .table_exists("subscription_checkpoints")
819 .await
820 .unwrap_or(false);
821
822 Ok(SchemaStatus {
823 has_events_table,
824 has_streams_table,
825 has_subscriptions_table,
826 is_complete: has_events_table,
827 })
828 }
829
830 async fn table_exists(&self, table_name: &str) -> Result<bool, PostgresError> {
832 let exists = sqlx::query_scalar::<_, bool>(
833 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = 'public')"
834 )
835 .bind(table_name)
836 .fetch_one(self.pool.as_ref())
837 .await
838 .map_err(PostgresError::Connection)?;
839
840 Ok(exists)
841 }
842
843 async fn check_performance(&self) -> Result<PerformanceStatus, PostgresError> {
845 let start = std::time::Instant::now();
846
847 let _count: i64 = sqlx::query_scalar(
849 "SELECT COUNT(*) FROM events WHERE created_at > NOW() - INTERVAL '1 minute'",
850 )
851 .fetch_one(self.pool.as_ref())
852 .await
853 .unwrap_or(0); let query_latency = start.elapsed();
856
857 Ok(PerformanceStatus {
858 query_latency,
859 is_performant: query_latency < Duration::from_millis(100), })
861 }
862
863 pub async fn recover_connection(&self) -> Result<(), PostgresError> {
865 debug!("Attempting connection recovery");
866
867 if !self.pool.is_closed() {
869 self.pool.close().await;
870 }
871
872 let test_pool = Self::create_pool(&self.config).await?;
874
875 sqlx::query("SELECT 1")
877 .execute(&test_pool)
878 .await
879 .map_err(PostgresError::Connection)?;
880
881 test_pool.close().await;
882
883 info!("Connection recovery completed successfully");
884 Ok(())
885 }
886
887 pub fn get_pool_metrics(&self) -> monitoring::PoolMetrics {
889 let pool_status = PoolStatus {
890 #[allow(clippy::cast_possible_truncation)]
891 size: self.pool.size() as u32,
892 #[allow(clippy::cast_possible_truncation)]
893 idle: self.pool.num_idle() as u32,
894 is_closed: self.pool.is_closed(),
895 };
896 self.monitor.get_metrics(&pool_status)
897 }
898
899 pub fn monitor(&self) -> Arc<monitoring::PoolMonitor> {
901 Arc::clone(&self.monitor)
902 }
903
904 pub fn start_pool_monitoring(
906 &self,
907 ) -> (
908 tokio::task::JoinHandle<()>,
909 tokio::sync::watch::Sender<bool>,
910 ) {
911 let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
912 let monitor = Arc::clone(&self.monitor);
913 let pool_ref = Arc::clone(&self.pool);
914 let interval = self.config.health_check_interval;
915
916 let task = tokio::spawn(async move {
917 let monitoring_task = monitoring::PoolMonitoringTask::new(monitor, interval, stop_rx);
918
919 monitoring_task
920 .run(move || PoolStatus {
921 #[allow(clippy::cast_possible_truncation)]
922 size: pool_ref.size() as u32,
923 #[allow(clippy::cast_possible_truncation)]
924 idle: pool_ref.num_idle() as u32,
925 is_closed: pool_ref.is_closed(),
926 })
927 .await;
928 });
929
930 (task, stop_tx)
931 }
932
933 pub async fn read_paginated(
977 &self,
978 stream_ids: &[StreamId],
979 options: &ReadOptions,
980 continuation_token: Option<EventId>,
981 ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
982 where
983 E: Serialize
984 + for<'de> Deserialize<'de>
985 + Send
986 + Sync
987 + Clone
988 + std::fmt::Debug
989 + PartialEq
990 + Eq
991 + 'static,
992 {
993 self.read_streams_paginated_impl(stream_ids, options, continuation_token)
995 .await
996 }
997}
998
999#[derive(Debug, Default)]
1003pub struct PostgresConfigBuilder {
1004 config: PostgresConfig,
1005}
1006
1007impl PostgresConfigBuilder {
1008 pub fn new() -> Self {
1010 Self::default()
1011 }
1012
1013 #[must_use]
1015 pub fn database_url(mut self, url: impl Into<String>) -> Self {
1016 self.config.database_url = url.into();
1017 self
1018 }
1019
1020 #[must_use]
1022 pub const fn max_connections(mut self, max: u32) -> Self {
1023 self.config.max_connections = max;
1024 self
1025 }
1026
1027 #[must_use]
1029 pub const fn min_connections(mut self, min: u32) -> Self {
1030 self.config.min_connections = min;
1031 self
1032 }
1033
1034 #[must_use]
1036 pub const fn connect_timeout(mut self, timeout: Duration) -> Self {
1037 self.config.connect_timeout = timeout;
1038 self
1039 }
1040
1041 #[must_use]
1043 pub const fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
1044 self.config.max_lifetime = lifetime;
1045 self
1046 }
1047
1048 #[must_use]
1050 pub const fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
1051 self.config.idle_timeout = timeout;
1052 self
1053 }
1054
1055 #[must_use]
1057 pub const fn test_before_acquire(mut self, test: bool) -> Self {
1058 self.config.test_before_acquire = test;
1059 self
1060 }
1061
1062 #[must_use]
1064 pub const fn query_timeout(mut self, timeout: Option<Duration>) -> Self {
1065 self.config.query_timeout = timeout;
1066 self
1067 }
1068
1069 #[must_use]
1071 pub const fn health_check_interval(mut self, interval: Duration) -> Self {
1072 self.config.health_check_interval = interval;
1073 self
1074 }
1075
1076 #[must_use]
1078 pub const fn max_retries(mut self, retries: u32) -> Self {
1079 self.config.max_retries = retries;
1080 self
1081 }
1082
1083 #[must_use]
1085 pub const fn retry_base_delay(mut self, delay: Duration) -> Self {
1086 self.config.retry_base_delay = delay;
1087 self
1088 }
1089
1090 #[must_use]
1092 pub const fn retry_max_delay(mut self, delay: Duration) -> Self {
1093 self.config.retry_max_delay = delay;
1094 self
1095 }
1096
1097 #[must_use]
1099 pub const fn enable_recovery(mut self, enable: bool) -> Self {
1100 self.config.enable_recovery = enable;
1101 self
1102 }
1103
1104 #[must_use]
1106 pub const fn read_batch_size(mut self, size: usize) -> Self {
1107 self.config.read_batch_size = size;
1108 self
1109 }
1110
1111 pub fn build(self) -> PostgresConfig {
1113 self.config
1114 }
1115
1116 #[must_use]
1118 pub const fn performance_optimized(mut self) -> Self {
1119 self.config.max_connections = 30;
1120 self.config.min_connections = 5;
1121 self.config.connect_timeout = Duration::from_secs(5);
1122 self.config.query_timeout = Some(Duration::from_secs(10)); self.config.max_lifetime = Some(Duration::from_secs(1800));
1124 self.config.idle_timeout = Some(Duration::from_secs(300));
1125 self.config.test_before_acquire = false;
1126 self
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133 use std::time::Duration;
1134
1135 #[test]
1136 fn test_postgres_config_default() {
1137 let config = PostgresConfig::default();
1138 assert_eq!(config.max_connections, 20); assert_eq!(config.min_connections, 2); assert_eq!(config.connect_timeout, Duration::from_secs(10)); assert!(!config.test_before_acquire); assert_eq!(config.read_batch_size, 1000); }
1144
1145 #[test]
1146 fn test_postgres_config_builder() {
1147 let config = PostgresConfigBuilder::new()
1148 .database_url("postgres://user:pass@localhost/test")
1149 .max_connections(20)
1150 .min_connections(2)
1151 .connect_timeout(Duration::from_secs(10))
1152 .test_before_acquire(false)
1153 .read_batch_size(2000)
1154 .build();
1155
1156 assert_eq!(config.database_url, "postgres://user:pass@localhost/test");
1157 assert_eq!(config.max_connections, 20);
1158 assert_eq!(config.min_connections, 2);
1159 assert_eq!(config.connect_timeout, Duration::from_secs(10));
1160 assert!(!config.test_before_acquire);
1161 assert_eq!(config.read_batch_size, 2000);
1162 }
1163
1164 #[test]
1165 fn test_postgres_error_conversion() {
1166 let postgres_error = PostgresError::Transaction("test error".to_string());
1167 let event_store_error: EventStoreError = postgres_error.into();
1168
1169 matches!(event_store_error, EventStoreError::TransactionRollback(_));
1170 }
1171
1172 #[test]
1173 fn test_postgres_config_new() {
1174 let config = PostgresConfig::new("postgres://custom/db");
1175 assert_eq!(config.database_url, "postgres://custom/db");
1176 assert_eq!(config.max_connections, 20);
1178 assert_eq!(config.min_connections, 2);
1179 assert_eq!(config.connect_timeout, Duration::from_secs(10));
1180 }
1181
1182 #[test]
1183 fn test_postgres_config_all_fields() {
1184 let config = PostgresConfig {
1185 database_url: "postgres://test/db".to_string(),
1186 max_connections: 50,
1187 min_connections: 10,
1188 connect_timeout: Duration::from_secs(5),
1189 query_timeout: Some(Duration::from_secs(60)),
1190 max_lifetime: Some(Duration::from_secs(3600)),
1191 idle_timeout: Some(Duration::from_secs(300)),
1192 test_before_acquire: true,
1193 max_retries: 5,
1194 retry_base_delay: Duration::from_millis(200),
1195 retry_max_delay: Duration::from_secs(10),
1196 enable_recovery: false,
1197 health_check_interval: Duration::from_secs(60),
1198 read_batch_size: 500,
1199 serialization_format: SerializationFormat::Json,
1200 };
1201
1202 assert_eq!(config.max_connections, 50);
1203 assert_eq!(config.min_connections, 10);
1204 assert_eq!(config.connect_timeout, Duration::from_secs(5));
1205 assert_eq!(config.query_timeout, Some(Duration::from_secs(60)));
1206 assert_eq!(config.max_lifetime, Some(Duration::from_secs(3600)));
1207 assert_eq!(config.idle_timeout, Some(Duration::from_secs(300)));
1208 assert!(config.test_before_acquire);
1209 assert_eq!(config.max_retries, 5);
1210 assert_eq!(config.retry_base_delay, Duration::from_millis(200));
1211 assert_eq!(config.retry_max_delay, Duration::from_secs(10));
1212 assert!(!config.enable_recovery);
1213 assert_eq!(config.health_check_interval, Duration::from_secs(60));
1214 }
1215
1216 #[test]
1217 fn test_postgres_config_builder_all_methods() {
1218 let config = PostgresConfigBuilder::new()
1219 .database_url("postgres://builder/test")
1220 .max_connections(25)
1221 .min_connections(3)
1222 .connect_timeout(Duration::from_secs(8))
1223 .max_lifetime(Some(Duration::from_secs(1200)))
1224 .idle_timeout(Some(Duration::from_secs(240)))
1225 .test_before_acquire(true)
1226 .query_timeout(Some(Duration::from_secs(45)))
1227 .build();
1228
1229 assert_eq!(config.database_url, "postgres://builder/test");
1230 assert_eq!(config.max_connections, 25);
1231 assert_eq!(config.min_connections, 3);
1232 assert_eq!(config.connect_timeout, Duration::from_secs(8));
1233 assert_eq!(config.max_lifetime, Some(Duration::from_secs(1200)));
1234 assert_eq!(config.idle_timeout, Some(Duration::from_secs(240)));
1235 assert!(config.test_before_acquire);
1236 assert_eq!(config.query_timeout, Some(Duration::from_secs(45)));
1237 }
1238
1239 #[test]
1240 fn test_postgres_config_builder_performance_optimized() {
1241 let config = PostgresConfigBuilder::new()
1242 .database_url("postgres://perf/test")
1243 .performance_optimized()
1244 .build();
1245
1246 assert_eq!(config.database_url, "postgres://perf/test");
1247 assert_eq!(config.max_connections, 30);
1248 assert_eq!(config.min_connections, 5);
1249 assert_eq!(config.connect_timeout, Duration::from_secs(5));
1250 assert_eq!(config.query_timeout, Some(Duration::from_secs(10)));
1251 assert_eq!(config.max_lifetime, Some(Duration::from_secs(1800)));
1252 assert_eq!(config.idle_timeout, Some(Duration::from_secs(300)));
1253 assert!(!config.test_before_acquire);
1254 }
1255
1256 #[test]
1257 fn test_pool_status_fields() {
1258 let status = PoolStatus {
1259 size: 10,
1260 idle: 3,
1261 is_closed: false,
1262 };
1263
1264 assert_eq!(status.size, 10);
1265 assert_eq!(status.idle, 3);
1266 assert!(!status.is_closed);
1267 }
1268
1269 #[test]
1270 fn test_health_status_fields() {
1271 let health = HealthStatus {
1272 is_healthy: true,
1273 basic_latency: Duration::from_millis(5),
1274 pool_status: PoolStatus {
1275 size: 20,
1276 idle: 15,
1277 is_closed: false,
1278 },
1279 schema_status: SchemaStatus {
1280 has_events_table: true,
1281 has_streams_table: true,
1282 has_subscriptions_table: false,
1283 is_complete: true,
1284 },
1285 performance_status: PerformanceStatus {
1286 query_latency: Duration::from_millis(10),
1287 is_performant: true,
1288 },
1289 last_check: chrono::Utc::now(),
1290 };
1291
1292 assert!(health.is_healthy);
1293 assert_eq!(health.basic_latency, Duration::from_millis(5));
1294 assert_eq!(health.pool_status.size, 20);
1295 assert!(health.schema_status.is_complete);
1296 assert!(health.performance_status.is_performant);
1297 }
1298}