eventcore_postgres/
lib.rs

1//! `PostgreSQL` adapter for `EventCore` event sourcing library
2//!
3//! This crate provides a `PostgreSQL` implementation of the `EventStore` trait
4//! from the eventcore crate, enabling persistent event storage with
5//! multi-stream atomicity support.
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![allow(clippy::struct_excessive_bools)]
10#![allow(clippy::unnecessary_wraps)]
11#![allow(clippy::unnecessary_cast)]
12
13pub mod circuit_breaker;
14mod event_store;
15pub mod monitoring;
16pub mod retry;
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::time::Duration;
21
22use serde::{Deserialize, Serialize};
23
24use chrono::{DateTime, Utc};
25use eventcore::serialization::SerializationFormat;
26use eventcore::{EventId, EventStoreError, ReadOptions, StoredEvent, StreamId};
27use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
28use thiserror::Error;
29use tracing::{debug, info, instrument};
30
31pub use circuit_breaker::{
32    CircuitBreaker, CircuitBreakerConfig, CircuitBreakerError, CircuitBreakerMetrics, CircuitState,
33};
34pub use monitoring::{AcquisitionTimer, PoolMetrics, PoolMonitor, PoolMonitoringTask};
35pub use retry::{RetryError, RetryStrategy};
36
37/// Comprehensive health status information
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct HealthStatus {
40    /// Whether the database is healthy overall
41    pub is_healthy: bool,
42    /// Latency of basic connectivity check
43    pub basic_latency: Duration,
44    /// Connection pool status
45    pub pool_status: PoolStatus,
46    /// Database schema status
47    pub schema_status: SchemaStatus,
48    /// Performance metrics
49    pub performance_status: PerformanceStatus,
50    /// Timestamp of last health check
51    pub last_check: DateTime<Utc>,
52}
53
54/// Connection pool status information
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct PoolStatus {
57    /// Current pool size
58    pub size: u32,
59    /// Number of idle connections
60    pub idle: u32,
61    /// Whether the pool is closed
62    pub is_closed: bool,
63}
64
65/// Database schema status information
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SchemaStatus {
68    /// Whether events table exists
69    pub has_events_table: bool,
70    /// Whether `event_streams` table exists
71    pub has_streams_table: bool,
72    /// Whether `subscription_checkpoints` table exists
73    pub has_subscriptions_table: bool,
74    /// Whether schema is complete for basic operations
75    pub is_complete: bool,
76}
77
78/// Performance status information
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct PerformanceStatus {
81    /// Latency of performance test query
82    pub query_latency: Duration,
83    /// Whether performance is within acceptable thresholds
84    pub is_performant: bool,
85}
86
87/// Configuration for `PostgreSQL` connection with production-hardening features
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PostgresConfig {
90    /// Database connection URL
91    pub database_url: String,
92
93    /// Maximum number of connections in the pool
94    pub max_connections: u32,
95
96    /// Minimum idle connections to maintain
97    pub min_connections: u32,
98
99    /// Connection timeout
100    pub connect_timeout: Duration,
101
102    /// Query timeout for individual database operations
103    /// This is different from connection timeout and applies to query execution
104    pub query_timeout: Option<Duration>,
105
106    /// Maximum lifetime of a connection
107    pub max_lifetime: Option<Duration>,
108
109    /// Idle timeout for connections
110    pub idle_timeout: Option<Duration>,
111
112    /// Whether to test connections before use
113    pub test_before_acquire: bool,
114
115    /// Maximum number of retry attempts for failed operations
116    pub max_retries: u32,
117
118    /// Base delay between retry attempts
119    pub retry_base_delay: Duration,
120
121    /// Maximum delay between retry attempts (for exponential backoff)
122    pub retry_max_delay: Duration,
123
124    /// Whether to enable connection recovery on failures
125    pub enable_recovery: bool,
126
127    /// Interval for periodic health checks
128    pub health_check_interval: Duration,
129
130    /// Batch size for reading events from multiple streams
131    /// This controls how many events are fetched in a single query
132    pub read_batch_size: usize,
133
134    /// Serialization format to use for event payloads
135    /// Defaults to JSON for backward compatibility
136    pub serialization_format: SerializationFormat,
137}
138
139impl PostgresConfig {
140    /// Create a new configuration with just a database URL, using defaults for other settings
141    pub fn new(database_url: impl Into<String>) -> Self {
142        Self {
143            database_url: database_url.into(),
144            ..Self::default()
145        }
146    }
147}
148
149impl Default for PostgresConfig {
150    fn default() -> Self {
151        Self {
152            database_url: "postgres://postgres:postgres@localhost/eventcore".to_string(),
153            max_connections: 20, // Increased for better concurrency
154            min_connections: 2,  // Keep minimum connections open
155            connect_timeout: Duration::from_secs(10), // Faster timeout for performance
156            query_timeout: Some(Duration::from_secs(30)), // 30 second default query timeout
157            max_lifetime: Some(Duration::from_secs(1800)), // 30 minutes
158            idle_timeout: Some(Duration::from_secs(600)), // 10 minutes
159            test_before_acquire: false, // Skip validation for performance
160            max_retries: 3,      // Retry failed operations up to 3 times
161            retry_base_delay: Duration::from_millis(100), // Start with 100ms delay
162            retry_max_delay: Duration::from_secs(5), // Maximum 5 second delay
163            enable_recovery: true, // Enable automatic connection recovery
164            health_check_interval: Duration::from_secs(30), // Check health every 30 seconds
165            read_batch_size: 1000, // Default batch size for multi-stream reads
166            serialization_format: SerializationFormat::default(), // JSON by default
167        }
168    }
169}
170
171/// PostgreSQL-specific errors
172#[derive(Debug, Error)]
173pub enum PostgresError {
174    /// Database connection error
175    #[error("Database connection error: {0}")]
176    Connection(#[from] sqlx::Error),
177
178    /// Pool creation error
179    #[error("Failed to create connection pool: {0}")]
180    PoolCreation(String),
181
182    /// Migration error
183    #[error("Database migration error: {0}")]
184    Migration(String),
185
186    /// Serialization error
187    #[error("JSON serialization error: {0}")]
188    Serialization(#[from] serde_json::Error),
189
190    /// Transaction error
191    #[error("Transaction error: {0}")]
192    Transaction(String),
193}
194
195#[allow(clippy::fallible_impl_from)] // We use expect() for critical internal errors that should never fail
196impl From<PostgresError> for EventStoreError {
197    fn from(error: PostgresError) -> Self {
198        match error {
199            PostgresError::Connection(sqlx_error) => {
200                // Handle specific sqlx errors
201                use sqlx::Error::{
202                    Configuration, Database, Io, PoolClosed, PoolTimedOut, Protocol, RowNotFound,
203                    Tls,
204                };
205                match &sqlx_error {
206                    Configuration(_) => Self::Configuration(sqlx_error.to_string()),
207                    Database(db_err) => {
208                        // Check for specific PostgreSQL error codes
209                        if let Some(code) = db_err.code() {
210                            match code.as_ref() {
211                                "23505" => {
212                                    // PostgreSQL unique violation
213                                    return Self::ConnectionFailed(format!(
214                                        "Unique constraint violation: {db_err}"
215                                    ));
216                                }
217                                "40001" => {
218                                    // Serialization failure - transaction conflict in SERIALIZABLE isolation
219                                    // Convert to VersionConflict which will be converted to ConcurrencyConflict by the executor
220                                    // We can't determine the specific stream, so create a generic conflict
221                                    use eventcore::{EventVersion, StreamId};
222                                    return Self::VersionConflict {
223                                        stream: StreamId::try_new("serialization-conflict")
224                                            .unwrap(),
225                                        expected: EventVersion::initial(),
226                                        current: EventVersion::try_new(1).unwrap(),
227                                    };
228                                }
229                                _ => {}
230                            }
231                        }
232
233                        // Also check the error message for serialization failures that might not have the code
234                        let error_msg = db_err.to_string().to_lowercase();
235                        if error_msg.contains("could not serialize access due to concurrent update")
236                            || error_msg.contains("serialization failure")
237                        {
238                            use eventcore::{EventVersion, StreamId};
239                            return Self::VersionConflict {
240                                stream: StreamId::try_new("serialization-conflict").unwrap(),
241                                expected: EventVersion::initial(),
242                                current: EventVersion::try_new(1).unwrap(),
243                            };
244                        }
245
246                        Self::ConnectionFailed(db_err.to_string())
247                    }
248                    Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed => {
249                        Self::ConnectionFailed(sqlx_error.to_string())
250                    }
251                    RowNotFound => Self::ConnectionFailed(format!("Row not found: {sqlx_error}")),
252                    _ => Self::Internal(sqlx_error.to_string()),
253                }
254            }
255            PostgresError::PoolCreation(msg) => Self::ConnectionFailed(msg),
256            PostgresError::Migration(msg) => Self::Configuration(msg),
257            PostgresError::Serialization(err) => Self::SerializationFailed(err.to_string()),
258            PostgresError::Transaction(msg) => Self::TransactionRollback(msg),
259        }
260    }
261}
262
263/// `PostgreSQL` event store implementation
264///
265/// TODO: The `serialization_format` configuration is currently stored but not used.
266/// The `PostgreSQL` adapter still uses hardcoded JSON serialization. Implementing
267/// pluggable serialization formats requires changes to the database schema and
268/// query logic to handle binary data instead of JSON columns.
269pub struct PostgresEventStore<E>
270where
271    E: Send + Sync,
272{
273    pool: Arc<PgPool>,
274    config: PostgresConfig,
275    retry_strategy: RetryStrategy,
276    monitor: Arc<monitoring::PoolMonitor>,
277    /// Phantom data to track event type
278    _phantom: PhantomData<E>,
279}
280
281impl<E> Clone for PostgresEventStore<E>
282where
283    E: Serialize
284        + for<'de> Deserialize<'de>
285        + Send
286        + Sync
287        + std::fmt::Debug
288        + Clone
289        + PartialEq
290        + Eq
291        + 'static,
292{
293    fn clone(&self) -> Self {
294        Self {
295            pool: Arc::clone(&self.pool),
296            config: self.config.clone(),
297            retry_strategy: self.retry_strategy.clone(),
298            monitor: Arc::clone(&self.monitor),
299            _phantom: PhantomData,
300        }
301    }
302}
303
304impl<E> PostgresEventStore<E>
305where
306    E: Serialize
307        + for<'de> Deserialize<'de>
308        + Send
309        + Sync
310        + std::fmt::Debug
311        + Clone
312        + PartialEq
313        + Eq
314        + 'static,
315{
316    /// Create a new `PostgreSQL` event store with the given configuration
317    pub async fn new(config: PostgresConfig) -> Result<Self, PostgresError> {
318        let pool = Self::create_pool(&config).await?;
319
320        let retry_strategy = RetryStrategy {
321            max_attempts: config.max_retries,
322            base_delay: config.retry_base_delay,
323            max_delay: config.retry_max_delay,
324            ..RetryStrategy::default()
325        };
326
327        let monitor = Arc::new(monitoring::PoolMonitor::new(config.max_connections));
328
329        Ok(Self {
330            pool: Arc::new(pool),
331            config,
332            retry_strategy,
333            monitor,
334            _phantom: PhantomData,
335        })
336    }
337
338    /// Create a new `PostgreSQL` event store with custom retry strategy
339    pub async fn new_with_retry_strategy(
340        config: PostgresConfig,
341        retry_strategy: RetryStrategy,
342    ) -> Result<Self, PostgresError> {
343        let pool = Self::create_pool(&config).await?;
344        let monitor = Arc::new(monitoring::PoolMonitor::new(config.max_connections));
345
346        Ok(Self {
347            pool: Arc::new(pool),
348            config,
349            retry_strategy,
350            monitor,
351            _phantom: PhantomData,
352        })
353    }
354
355    /// Create a connection pool from configuration
356    async fn create_pool(config: &PostgresConfig) -> Result<PgPool, PostgresError> {
357        let mut connect_options: PgConnectOptions = config
358            .database_url
359            .parse()
360            .map_err(|e| PostgresError::PoolCreation(format!("Invalid database URL: {e}")))?;
361
362        // Apply query timeout if configured
363        if let Some(query_timeout) = config.query_timeout {
364            // Convert Duration to postgres statement timeout format (milliseconds)
365            // Safe to cast as we're unlikely to have timeouts > u64::MAX milliseconds
366            #[allow(clippy::cast_possible_truncation)]
367            let timeout_ms = query_timeout.as_millis() as u64;
368            connect_options =
369                connect_options.options([("statement_timeout", &timeout_ms.to_string())]);
370        }
371
372        let mut pool_options = PgPoolOptions::new()
373            .max_connections(config.max_connections)
374            .min_connections(config.min_connections)
375            .acquire_timeout(config.connect_timeout)
376            .test_before_acquire(config.test_before_acquire);
377
378        if let Some(max_lifetime) = config.max_lifetime {
379            pool_options = pool_options.max_lifetime(max_lifetime);
380        }
381
382        if let Some(idle_timeout) = config.idle_timeout {
383            pool_options = pool_options.idle_timeout(idle_timeout);
384        }
385
386        pool_options
387            .connect_with(connect_options)
388            .await
389            .map_err(|e| PostgresError::PoolCreation(format!("Failed to create pool: {e}")))
390    }
391
392    /// Get a reference to the connection pool
393    pub fn pool(&self) -> &PgPool {
394        &self.pool
395    }
396
397    /// Get the configuration
398    pub const fn config(&self) -> &PostgresConfig {
399        &self.config
400    }
401
402    /// Run database migrations
403    #[allow(clippy::unused_async)]
404    pub async fn migrate(&self) -> Result<(), PostgresError> {
405        // TODO: Implement migrations in Phase 8.3
406        info!("Database migrations will be implemented in Phase 8.3");
407        Ok(())
408    }
409
410    /// Check if schema tables exist
411    async fn schema_exists(&self) -> Result<bool, PostgresError> {
412        let count = sqlx::query_scalar::<_, i64>(
413            r"
414            SELECT COUNT(*) FROM information_schema.tables 
415            WHERE table_name IN ('events', 'event_streams')
416            AND table_schema = 'public'
417            ",
418        )
419        .fetch_one(self.pool.as_ref())
420        .await
421        .map_err(PostgresError::Connection)?;
422
423        Ok(count >= 2)
424    }
425
426    /// Try to acquire schema initialization lock
427    async fn try_acquire_schema_lock(&self, lock_id: i64) -> Result<bool, PostgresError> {
428        sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)")
429            .bind(lock_id)
430            .fetch_one(self.pool.as_ref())
431            .await
432            .map_err(PostgresError::Connection)
433    }
434
435    /// Wait for schema initialization by another process
436    async fn wait_for_schema_initialization(&self, lock_id: i64) -> Result<bool, PostgresError> {
437        // Try to acquire the lock again with a longer wait
438        for _ in 0..50 {
439            // Try for up to 5 seconds
440            tokio::time::sleep(Duration::from_millis(100)).await;
441
442            if self.try_acquire_schema_lock(lock_id).await? {
443                // We got the lock
444                return Ok(true);
445            }
446
447            // Check if tables were created while we were waiting
448            if self.schema_exists().await? {
449                debug!("Database schema initialized by another process while waiting");
450                return Ok(false);
451            }
452        }
453
454        // Final check
455        if self.try_acquire_schema_lock(lock_id).await? {
456            Ok(true)
457        } else if self.schema_exists().await? {
458            debug!("Database schema initialized by another process");
459            Ok(false)
460        } else {
461            Err(PostgresError::Migration(
462                "Failed to acquire schema initialization lock after multiple attempts".to_string(),
463            ))
464        }
465    }
466
467    /// Initialize the database schema
468    ///
469    /// This method creates the necessary tables and indexes for the event store.
470    /// It is idempotent and can be called multiple times safely.
471    /// Uses `PostgreSQL` advisory locks to prevent concurrent initialization conflicts.
472    #[allow(clippy::too_many_lines)]
473    pub async fn initialize(&self) -> Result<(), PostgresError> {
474        // Use PostgreSQL advisory lock to prevent concurrent schema initialization
475        // Lock ID 123456789 is arbitrary but consistent for schema initialization
476        const SCHEMA_LOCK_ID: i64 = 123_456_789;
477
478        // Try to acquire advisory lock
479        let mut lock_acquired = self.try_acquire_schema_lock(SCHEMA_LOCK_ID).await?;
480
481        if !lock_acquired {
482            // Another process is initializing the schema, wait briefly and check if schema exists
483            tokio::time::sleep(Duration::from_millis(100)).await;
484
485            // Check if tables exist - if they do, initialization was completed by another process
486            if self.schema_exists().await? {
487                debug!("Database schema already initialized by another process");
488                return Ok(());
489            }
490
491            // Wait for schema initialization or acquire lock
492            lock_acquired = self.wait_for_schema_initialization(SCHEMA_LOCK_ID).await?;
493            if !lock_acquired {
494                // Schema was initialized by another process
495                return Ok(());
496            }
497        }
498
499        // We have the lock, proceed with initialization
500        let result = async {
501            // Enable pgcrypto extension for random bytes generation (must be first)
502            sqlx::query("CREATE EXTENSION IF NOT EXISTS pgcrypto")
503                .execute(self.pool.as_ref())
504                .await
505                .map_err(PostgresError::Connection)?;
506
507            // Create function to generate UUIDv7 (must be before table creation)
508            sqlx::query(
509                r"
510                CREATE OR REPLACE FUNCTION gen_uuidv7() RETURNS UUID AS $$
511                DECLARE
512                    unix_ts_ms BIGINT;
513                    uuid_bytes BYTEA;
514                BEGIN
515                    -- Get current timestamp in milliseconds since Unix epoch
516                    unix_ts_ms := (extract(epoch from clock_timestamp()) * 1000)::BIGINT;
517                    
518                    -- Create a 16-byte UUID:
519                    -- Bytes 0-5: 48-bit big-endian timestamp (milliseconds since Unix epoch)
520                    -- Bytes 6-7: 4-bit version (0111) + 12 random bits  
521                    -- Bytes 8-9: 2-bit variant (10) + 14 random bits
522                    -- Bytes 10-15: 48 random bits
523                    
524                    -- Build the UUID byte by byte
525                    uuid_bytes := 
526                        -- Timestamp: 48 bits, big-endian
527                        substring(int8send(unix_ts_ms) from 3 for 6) ||
528                        -- Version (0111 = 7) + random: 16 bits
529                        -- First byte: 0111RRRR (where R is random)
530                        set_byte(gen_random_bytes(1), 0, (get_byte(gen_random_bytes(1), 0) & 15) | 112) ||
531                        -- Second byte: all random
532                        gen_random_bytes(1) ||
533                        -- Variant (10) + random: 16 bits  
534                        -- First byte: 10RRRRRR (where R is random)
535                        set_byte(gen_random_bytes(1), 0, (get_byte(gen_random_bytes(1), 0) & 63) | 128) ||
536                        -- Remaining 7 bytes: all random
537                        gen_random_bytes(7);
538                    
539                    RETURN encode(uuid_bytes, 'hex')::UUID;
540                END;
541                $$ LANGUAGE plpgsql VOLATILE;
542                ",
543            )
544            .execute(self.pool.as_ref())
545            .await
546            .map_err(PostgresError::Connection)?;
547
548            // Create events table (now that the function exists)
549            sqlx::query(
550                r"
551                CREATE TABLE IF NOT EXISTS events (
552                    event_id UUID PRIMARY KEY DEFAULT gen_uuidv7(),
553                    stream_id VARCHAR(255) NOT NULL,
554                    event_version BIGINT NOT NULL,
555                    event_type VARCHAR(255) NOT NULL,
556                    event_data JSONB NOT NULL,
557                    metadata JSONB,
558                    causation_id UUID,
559                    correlation_id VARCHAR(255),
560                    user_id VARCHAR(255),
561                    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
562                    UNIQUE(stream_id, event_version)
563                )
564                ",
565            )
566            .execute(self.pool.as_ref())
567            .await
568            .map_err(PostgresError::Connection)?;
569
570            // Create indexes - must be separate queries
571            sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id)")
572                .execute(self.pool.as_ref())
573                .await
574                .map_err(PostgresError::Connection)?;
575
576            sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at)")
577                .execute(self.pool.as_ref())
578                .await
579                .map_err(PostgresError::Connection)?;
580
581            sqlx::query(
582                "CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON events(correlation_id)",
583            )
584            .execute(self.pool.as_ref())
585            .await
586            .map_err(PostgresError::Connection)?;
587
588            // Create optimized index for multi-stream reads using ANY($1) pattern
589            // This composite index supports the common multi-stream read pattern
590            sqlx::query(
591                "CREATE INDEX IF NOT EXISTS idx_events_multistream_any ON events(stream_id, event_id)",
592            )
593            .execute(self.pool.as_ref())
594            .await
595            .map_err(PostgresError::Connection)?;
596
597            // Migrate existing tables to have default event_id generation (for upgrading existing installations)
598            sqlx::query("ALTER TABLE events ALTER COLUMN event_id SET DEFAULT gen_uuidv7()")
599                .execute(self.pool.as_ref())
600                .await
601                .map_err(PostgresError::Connection)?;
602
603            // Create simplified row-level trigger for basic validation
604            sqlx::query(
605                r"
606                CREATE OR REPLACE FUNCTION check_basic_event_constraints() RETURNS TRIGGER AS $$
607                DECLARE
608                    current_max_version BIGINT;
609                BEGIN
610                    -- Lock the stream for this transaction to ensure sequential versioning
611                    -- This prevents gaps when multiple events are inserted in parallel
612                    PERFORM pg_advisory_xact_lock(hashtext(NEW.stream_id));
613                    
614                    -- For version 1, this is ExpectedVersion::New - stream MUST be empty
615                    IF NEW.event_version = 1 THEN
616                        SELECT COALESCE(MAX(event_version), 0) INTO current_max_version
617                        FROM events
618                        WHERE stream_id = NEW.stream_id;
619                        
620                        IF current_max_version != 0 THEN
621                            RAISE EXCEPTION 'Version conflict for stream %: cannot insert version 1 when stream already has events (current max: %)', 
622                                NEW.stream_id, current_max_version
623                                USING ERRCODE = '40001';
624                        END IF;
625                    END IF;
626                    
627                    -- Note: event_id is automatically generated by column default
628                    -- Comprehensive gap detection is handled by statement-level trigger
629                    
630                    RETURN NEW;
631                END;
632                $$ LANGUAGE plpgsql
633                ",
634            )
635            .execute(self.pool.as_ref())
636            .await
637            .map_err(PostgresError::Connection)?;
638
639            // Create function for comprehensive batch gap detection
640            sqlx::query(
641                r"
642                CREATE OR REPLACE FUNCTION validate_batch_versions() RETURNS TRIGGER AS $$
643                DECLARE
644                    stream_record RECORD;
645                    previous_max_version BIGINT;
646                    min_new_version BIGINT;
647                    max_new_version BIGINT;
648                    expected_count BIGINT;
649                    actual_count BIGINT;
650                    current_txid BIGINT;
651                BEGIN
652                    -- Get current transaction ID
653                    current_txid := txid_current();
654                    
655                    -- Get all streams that had events inserted in this statement
656                    -- We'll identify them by events whose xmin matches current transaction
657                    FOR stream_record IN
658                        SELECT DISTINCT stream_id
659                        FROM events
660                        WHERE xmin::text::bigint = current_txid
661                    LOOP
662                        -- Get the previous max version (before this transaction)
663                        SELECT COALESCE(MAX(event_version), 0) INTO previous_max_version
664                        FROM events 
665                        WHERE stream_id = stream_record.stream_id
666                        AND xmin::text::bigint != current_txid;
667                        
668                        -- Get the range of new versions for this stream in this transaction
669                        SELECT MIN(event_version), MAX(event_version), COUNT(*) 
670                        INTO min_new_version, max_new_version, actual_count
671                        FROM events 
672                        WHERE stream_id = stream_record.stream_id
673                        AND xmin::text::bigint = current_txid;
674                        
675                        -- Validate that new versions start immediately after previous max
676                        IF min_new_version != previous_max_version + 1 THEN
677                            RAISE EXCEPTION 'Version gap detected for stream %: expected first new version %, got %',
678                                stream_record.stream_id, previous_max_version + 1, min_new_version
679                                USING ERRCODE = '40001';
680                        END IF;
681                        
682                        -- Validate no gaps within the batch
683                        expected_count := max_new_version - min_new_version + 1;
684                        IF actual_count != expected_count THEN
685                            RAISE EXCEPTION 'Version gaps detected within batch for stream %: expected % contiguous versions, got %',
686                                stream_record.stream_id, expected_count, actual_count
687                                USING ERRCODE = '40001';
688                        END IF;
689                    END LOOP;
690                    
691                    RETURN NULL; -- Result is ignored for AFTER triggers
692                END;
693                $$ LANGUAGE plpgsql
694                ",
695            )
696            .execute(self.pool.as_ref())
697            .await
698            .map_err(PostgresError::Connection)?;
699
700            // Drop all existing triggers if they exist
701            sqlx::query("DROP TRIGGER IF EXISTS enforce_event_version ON events")
702                .execute(self.pool.as_ref())
703                .await
704                .map_err(PostgresError::Connection)?;
705
706            sqlx::query("DROP TRIGGER IF EXISTS check_version_gaps_after_insert ON events")
707                .execute(self.pool.as_ref())
708                .await
709                .map_err(PostgresError::Connection)?;
710
711            sqlx::query("DROP TRIGGER IF EXISTS enforce_basic_event_constraints ON events")
712                .execute(self.pool.as_ref())
713                .await
714                .map_err(PostgresError::Connection)?;
715
716            sqlx::query("DROP TRIGGER IF EXISTS validate_batch_versions_after_insert ON events")
717                .execute(self.pool.as_ref())
718                .await
719                .map_err(PostgresError::Connection)?;
720
721            // Create row-level trigger for basic validation (ExpectedVersion::New)
722            sqlx::query(
723                r"
724                CREATE TRIGGER enforce_basic_event_constraints
725                    BEFORE INSERT ON events
726                    FOR EACH ROW
727                    EXECUTE FUNCTION check_basic_event_constraints()
728                ",
729            )
730            .execute(self.pool.as_ref())
731            .await
732            .map_err(PostgresError::Connection)?;
733
734            // Create statement-level trigger for comprehensive gap detection
735            sqlx::query(
736                r"
737                CREATE TRIGGER validate_batch_versions_after_insert
738                    AFTER INSERT ON events
739                    FOR EACH STATEMENT
740                    EXECUTE FUNCTION validate_batch_versions()
741                ",
742            )
743            .execute(self.pool.as_ref())
744            .await
745            .map_err(PostgresError::Connection)?;
746
747
748            // Drop the foreign key constraint since we no longer use event_streams table
749            sqlx::query("ALTER TABLE events DROP CONSTRAINT IF EXISTS fk_events_stream_id")
750                .execute(self.pool.as_ref())
751                .await
752                .map_err(PostgresError::Connection)?;
753
754            info!("Database schema initialized successfully");
755            Ok::<(), PostgresError>(())
756        }
757        .await;
758
759        // Always release the advisory lock, regardless of success or failure
760        let _unlock_result = sqlx::query("SELECT pg_advisory_unlock($1)")
761            .bind(SCHEMA_LOCK_ID)
762            .execute(self.pool.as_ref())
763            .await;
764
765        result
766    }
767
768    /// Check database connectivity with comprehensive health checks
769    #[instrument(skip(self))]
770    pub async fn health_check(&self) -> Result<HealthStatus, PostgresError> {
771        let start = std::time::Instant::now();
772
773        // Basic connectivity check
774        sqlx::query("SELECT 1")
775            .execute(self.pool.as_ref())
776            .await
777            .map_err(PostgresError::Connection)?;
778
779        let basic_latency = start.elapsed();
780
781        // Advanced health checks
782        let pool_status = self.get_pool_status()?;
783        let schema_status = self.verify_schema().await?;
784        let performance_status = self.check_performance().await?;
785
786        let status = HealthStatus {
787            is_healthy: true,
788            basic_latency,
789            pool_status,
790            schema_status,
791            performance_status,
792            last_check: chrono::Utc::now(),
793        };
794
795        debug!("PostgreSQL health check passed: {:?}", status);
796        Ok(status)
797    }
798
799    /// Get detailed connection pool status
800    fn get_pool_status(&self) -> Result<PoolStatus, PostgresError> {
801        let pool = self.pool.as_ref();
802
803        Ok(PoolStatus {
804            #[allow(clippy::cast_possible_truncation)]
805            size: pool.size() as u32,
806            #[allow(clippy::cast_possible_truncation)]
807            idle: pool.num_idle() as u32,
808            is_closed: pool.is_closed(),
809        })
810    }
811
812    /// Verify that required database schema exists
813    async fn verify_schema(&self) -> Result<SchemaStatus, PostgresError> {
814        let has_events_table = self.table_exists("events").await?;
815        // Note: event_streams table is no longer used as of migration 007
816        let has_streams_table = false;
817        let has_subscriptions_table = self
818            .table_exists("subscription_checkpoints")
819            .await
820            .unwrap_or(false);
821
822        Ok(SchemaStatus {
823            has_events_table,
824            has_streams_table,
825            has_subscriptions_table,
826            is_complete: has_events_table,
827        })
828    }
829
830    /// Check if a specific table exists
831    async fn table_exists(&self, table_name: &str) -> Result<bool, PostgresError> {
832        let exists = sqlx::query_scalar::<_, bool>(
833            "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = 'public')"
834        )
835        .bind(table_name)
836        .fetch_one(self.pool.as_ref())
837        .await
838        .map_err(PostgresError::Connection)?;
839
840        Ok(exists)
841    }
842
843    /// Check database performance characteristics
844    async fn check_performance(&self) -> Result<PerformanceStatus, PostgresError> {
845        let start = std::time::Instant::now();
846
847        // Test a simple query that exercises indexes
848        let _count: i64 = sqlx::query_scalar(
849            "SELECT COUNT(*) FROM events WHERE created_at > NOW() - INTERVAL '1 minute'",
850        )
851        .fetch_one(self.pool.as_ref())
852        .await
853        .unwrap_or(0); // Gracefully handle if events table doesn't exist yet
854
855        let query_latency = start.elapsed();
856
857        Ok(PerformanceStatus {
858            query_latency,
859            is_performant: query_latency < Duration::from_millis(100), // 100ms threshold
860        })
861    }
862
863    /// Attempt to recover from connection issues
864    pub async fn recover_connection(&self) -> Result<(), PostgresError> {
865        debug!("Attempting connection recovery");
866
867        // Close any potentially stale connections
868        if !self.pool.is_closed() {
869            self.pool.close().await;
870        }
871
872        // Test if we can still connect with a new pool
873        let test_pool = Self::create_pool(&self.config).await?;
874
875        // Verify basic connectivity
876        sqlx::query("SELECT 1")
877            .execute(&test_pool)
878            .await
879            .map_err(PostgresError::Connection)?;
880
881        test_pool.close().await;
882
883        info!("Connection recovery completed successfully");
884        Ok(())
885    }
886
887    /// Get current pool metrics
888    pub fn get_pool_metrics(&self) -> monitoring::PoolMetrics {
889        let pool_status = PoolStatus {
890            #[allow(clippy::cast_possible_truncation)]
891            size: self.pool.size() as u32,
892            #[allow(clippy::cast_possible_truncation)]
893            idle: self.pool.num_idle() as u32,
894            is_closed: self.pool.is_closed(),
895        };
896        self.monitor.get_metrics(&pool_status)
897    }
898
899    /// Get pool monitor for advanced monitoring setup
900    pub fn monitor(&self) -> Arc<monitoring::PoolMonitor> {
901        Arc::clone(&self.monitor)
902    }
903
904    /// Start background pool monitoring task
905    pub fn start_pool_monitoring(
906        &self,
907    ) -> (
908        tokio::task::JoinHandle<()>,
909        tokio::sync::watch::Sender<bool>,
910    ) {
911        let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
912        let monitor = Arc::clone(&self.monitor);
913        let pool_ref = Arc::clone(&self.pool);
914        let interval = self.config.health_check_interval;
915
916        let task = tokio::spawn(async move {
917            let monitoring_task = monitoring::PoolMonitoringTask::new(monitor, interval, stop_rx);
918
919            monitoring_task
920                .run(move || PoolStatus {
921                    #[allow(clippy::cast_possible_truncation)]
922                    size: pool_ref.size() as u32,
923                    #[allow(clippy::cast_possible_truncation)]
924                    idle: pool_ref.num_idle() as u32,
925                    is_closed: pool_ref.is_closed(),
926                })
927                .await;
928        });
929
930        (task, stop_tx)
931    }
932
933    /// Read events from multiple streams with pagination support.
934    ///
935    /// This method is designed for efficiently processing large result sets
936    /// without loading all events into memory at once.
937    ///
938    /// # Arguments
939    /// * `stream_ids` - The streams to read from
940    /// * `options` - Read options (version filters, max events)
941    /// * `continuation_token` - Optional continuation token from previous page
942    ///
943    /// # Returns
944    /// A tuple containing:
945    /// * Vector of events for this page
946    /// * Optional continuation token for the next page (None if no more results)
947    ///
948    /// # Example
949    /// ```no_run
950    /// # use eventcore_postgres::PostgresEventStore;
951    /// # use eventcore::{EventId, ReadOptions, StreamId};
952    /// # async fn example<E>(store: &PostgresEventStore<E>) -> Result<(), Box<dyn std::error::Error>>
953    /// # where E: serde::Serialize + for<'de> serde::de::Deserialize<'de> + Send + Sync + Clone + std::fmt::Debug + PartialEq + Eq + 'static
954    /// # {
955    /// let stream_ids = vec![StreamId::try_new("stream-1")?, StreamId::try_new("stream-2")?];
956    /// let options = ReadOptions::default();
957    ///
958    /// let mut continuation = None;
959    /// loop {
960    ///     let (events, next_token) = store.read_paginated(&stream_ids, &options, continuation).await?;
961    ///     
962    ///     // Process events for this page
963    ///     for event in events {
964    ///         println!("Processing event: {:?}", event.event_id);
965    ///     }
966    ///     
967    ///     // Check if there are more pages
968    ///     match next_token {
969    ///         Some(token) => continuation = Some(token),
970    ///         None => break, // No more pages
971    ///     }
972    /// }
973    /// # Ok(())
974    /// # }
975    /// ```
976    pub async fn read_paginated(
977        &self,
978        stream_ids: &[StreamId],
979        options: &ReadOptions,
980        continuation_token: Option<EventId>,
981    ) -> Result<(Vec<StoredEvent<E>>, Option<EventId>), EventStoreError>
982    where
983        E: Serialize
984            + for<'de> Deserialize<'de>
985            + Send
986            + Sync
987            + Clone
988            + std::fmt::Debug
989            + PartialEq
990            + Eq
991            + 'static,
992    {
993        // Delegate to the implementation
994        self.read_streams_paginated_impl(stream_ids, options, continuation_token)
995            .await
996    }
997}
998
999// EventStore implementation is now in the event_store module
1000
1001/// Builder for `PostgreSQL` event store configuration
1002#[derive(Debug, Default)]
1003pub struct PostgresConfigBuilder {
1004    config: PostgresConfig,
1005}
1006
1007impl PostgresConfigBuilder {
1008    /// Create a new configuration builder
1009    pub fn new() -> Self {
1010        Self::default()
1011    }
1012
1013    /// Set the database URL
1014    #[must_use]
1015    pub fn database_url(mut self, url: impl Into<String>) -> Self {
1016        self.config.database_url = url.into();
1017        self
1018    }
1019
1020    /// Set the maximum number of connections
1021    #[must_use]
1022    pub const fn max_connections(mut self, max: u32) -> Self {
1023        self.config.max_connections = max;
1024        self
1025    }
1026
1027    /// Set the minimum number of idle connections
1028    #[must_use]
1029    pub const fn min_connections(mut self, min: u32) -> Self {
1030        self.config.min_connections = min;
1031        self
1032    }
1033
1034    /// Set the connection timeout
1035    #[must_use]
1036    pub const fn connect_timeout(mut self, timeout: Duration) -> Self {
1037        self.config.connect_timeout = timeout;
1038        self
1039    }
1040
1041    /// Set the maximum connection lifetime
1042    #[must_use]
1043    pub const fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
1044        self.config.max_lifetime = lifetime;
1045        self
1046    }
1047
1048    /// Set the idle timeout
1049    #[must_use]
1050    pub const fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
1051        self.config.idle_timeout = timeout;
1052        self
1053    }
1054
1055    /// Set whether to test connections before acquisition
1056    #[must_use]
1057    pub const fn test_before_acquire(mut self, test: bool) -> Self {
1058        self.config.test_before_acquire = test;
1059        self
1060    }
1061
1062    /// Set the query timeout
1063    #[must_use]
1064    pub const fn query_timeout(mut self, timeout: Option<Duration>) -> Self {
1065        self.config.query_timeout = timeout;
1066        self
1067    }
1068
1069    /// Set the health check interval
1070    #[must_use]
1071    pub const fn health_check_interval(mut self, interval: Duration) -> Self {
1072        self.config.health_check_interval = interval;
1073        self
1074    }
1075
1076    /// Set the maximum retries
1077    #[must_use]
1078    pub const fn max_retries(mut self, retries: u32) -> Self {
1079        self.config.max_retries = retries;
1080        self
1081    }
1082
1083    /// Set the retry base delay
1084    #[must_use]
1085    pub const fn retry_base_delay(mut self, delay: Duration) -> Self {
1086        self.config.retry_base_delay = delay;
1087        self
1088    }
1089
1090    /// Set the retry max delay
1091    #[must_use]
1092    pub const fn retry_max_delay(mut self, delay: Duration) -> Self {
1093        self.config.retry_max_delay = delay;
1094        self
1095    }
1096
1097    /// Enable or disable connection recovery
1098    #[must_use]
1099    pub const fn enable_recovery(mut self, enable: bool) -> Self {
1100        self.config.enable_recovery = enable;
1101        self
1102    }
1103
1104    /// Set the read batch size for multi-stream queries
1105    #[must_use]
1106    pub const fn read_batch_size(mut self, size: usize) -> Self {
1107        self.config.read_batch_size = size;
1108        self
1109    }
1110
1111    /// Build the configuration
1112    pub fn build(self) -> PostgresConfig {
1113        self.config
1114    }
1115
1116    /// Configure for high-performance event sourcing workloads
1117    #[must_use]
1118    pub const fn performance_optimized(mut self) -> Self {
1119        self.config.max_connections = 30;
1120        self.config.min_connections = 5;
1121        self.config.connect_timeout = Duration::from_secs(5);
1122        self.config.query_timeout = Some(Duration::from_secs(10)); // Fast query timeout
1123        self.config.max_lifetime = Some(Duration::from_secs(1800));
1124        self.config.idle_timeout = Some(Duration::from_secs(300));
1125        self.config.test_before_acquire = false;
1126        self
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::*;
1133    use std::time::Duration;
1134
1135    #[test]
1136    fn test_postgres_config_default() {
1137        let config = PostgresConfig::default();
1138        assert_eq!(config.max_connections, 20); // Updated for performance
1139        assert_eq!(config.min_connections, 2); // Updated for performance
1140        assert_eq!(config.connect_timeout, Duration::from_secs(10)); // Updated for performance
1141        assert!(!config.test_before_acquire); // Updated for performance
1142        assert_eq!(config.read_batch_size, 1000); // Default batch size
1143    }
1144
1145    #[test]
1146    fn test_postgres_config_builder() {
1147        let config = PostgresConfigBuilder::new()
1148            .database_url("postgres://user:pass@localhost/test")
1149            .max_connections(20)
1150            .min_connections(2)
1151            .connect_timeout(Duration::from_secs(10))
1152            .test_before_acquire(false)
1153            .read_batch_size(2000)
1154            .build();
1155
1156        assert_eq!(config.database_url, "postgres://user:pass@localhost/test");
1157        assert_eq!(config.max_connections, 20);
1158        assert_eq!(config.min_connections, 2);
1159        assert_eq!(config.connect_timeout, Duration::from_secs(10));
1160        assert!(!config.test_before_acquire);
1161        assert_eq!(config.read_batch_size, 2000);
1162    }
1163
1164    #[test]
1165    fn test_postgres_error_conversion() {
1166        let postgres_error = PostgresError::Transaction("test error".to_string());
1167        let event_store_error: EventStoreError = postgres_error.into();
1168
1169        matches!(event_store_error, EventStoreError::TransactionRollback(_));
1170    }
1171
1172    #[test]
1173    fn test_postgres_config_new() {
1174        let config = PostgresConfig::new("postgres://custom/db");
1175        assert_eq!(config.database_url, "postgres://custom/db");
1176        // Should use defaults for other fields
1177        assert_eq!(config.max_connections, 20);
1178        assert_eq!(config.min_connections, 2);
1179        assert_eq!(config.connect_timeout, Duration::from_secs(10));
1180    }
1181
1182    #[test]
1183    fn test_postgres_config_all_fields() {
1184        let config = PostgresConfig {
1185            database_url: "postgres://test/db".to_string(),
1186            max_connections: 50,
1187            min_connections: 10,
1188            connect_timeout: Duration::from_secs(5),
1189            query_timeout: Some(Duration::from_secs(60)),
1190            max_lifetime: Some(Duration::from_secs(3600)),
1191            idle_timeout: Some(Duration::from_secs(300)),
1192            test_before_acquire: true,
1193            max_retries: 5,
1194            retry_base_delay: Duration::from_millis(200),
1195            retry_max_delay: Duration::from_secs(10),
1196            enable_recovery: false,
1197            health_check_interval: Duration::from_secs(60),
1198            read_batch_size: 500,
1199            serialization_format: SerializationFormat::Json,
1200        };
1201
1202        assert_eq!(config.max_connections, 50);
1203        assert_eq!(config.min_connections, 10);
1204        assert_eq!(config.connect_timeout, Duration::from_secs(5));
1205        assert_eq!(config.query_timeout, Some(Duration::from_secs(60)));
1206        assert_eq!(config.max_lifetime, Some(Duration::from_secs(3600)));
1207        assert_eq!(config.idle_timeout, Some(Duration::from_secs(300)));
1208        assert!(config.test_before_acquire);
1209        assert_eq!(config.max_retries, 5);
1210        assert_eq!(config.retry_base_delay, Duration::from_millis(200));
1211        assert_eq!(config.retry_max_delay, Duration::from_secs(10));
1212        assert!(!config.enable_recovery);
1213        assert_eq!(config.health_check_interval, Duration::from_secs(60));
1214    }
1215
1216    #[test]
1217    fn test_postgres_config_builder_all_methods() {
1218        let config = PostgresConfigBuilder::new()
1219            .database_url("postgres://builder/test")
1220            .max_connections(25)
1221            .min_connections(3)
1222            .connect_timeout(Duration::from_secs(8))
1223            .max_lifetime(Some(Duration::from_secs(1200)))
1224            .idle_timeout(Some(Duration::from_secs(240)))
1225            .test_before_acquire(true)
1226            .query_timeout(Some(Duration::from_secs(45)))
1227            .build();
1228
1229        assert_eq!(config.database_url, "postgres://builder/test");
1230        assert_eq!(config.max_connections, 25);
1231        assert_eq!(config.min_connections, 3);
1232        assert_eq!(config.connect_timeout, Duration::from_secs(8));
1233        assert_eq!(config.max_lifetime, Some(Duration::from_secs(1200)));
1234        assert_eq!(config.idle_timeout, Some(Duration::from_secs(240)));
1235        assert!(config.test_before_acquire);
1236        assert_eq!(config.query_timeout, Some(Duration::from_secs(45)));
1237    }
1238
1239    #[test]
1240    fn test_postgres_config_builder_performance_optimized() {
1241        let config = PostgresConfigBuilder::new()
1242            .database_url("postgres://perf/test")
1243            .performance_optimized()
1244            .build();
1245
1246        assert_eq!(config.database_url, "postgres://perf/test");
1247        assert_eq!(config.max_connections, 30);
1248        assert_eq!(config.min_connections, 5);
1249        assert_eq!(config.connect_timeout, Duration::from_secs(5));
1250        assert_eq!(config.query_timeout, Some(Duration::from_secs(10)));
1251        assert_eq!(config.max_lifetime, Some(Duration::from_secs(1800)));
1252        assert_eq!(config.idle_timeout, Some(Duration::from_secs(300)));
1253        assert!(!config.test_before_acquire);
1254    }
1255
1256    #[test]
1257    fn test_pool_status_fields() {
1258        let status = PoolStatus {
1259            size: 10,
1260            idle: 3,
1261            is_closed: false,
1262        };
1263
1264        assert_eq!(status.size, 10);
1265        assert_eq!(status.idle, 3);
1266        assert!(!status.is_closed);
1267    }
1268
1269    #[test]
1270    fn test_health_status_fields() {
1271        let health = HealthStatus {
1272            is_healthy: true,
1273            basic_latency: Duration::from_millis(5),
1274            pool_status: PoolStatus {
1275                size: 20,
1276                idle: 15,
1277                is_closed: false,
1278            },
1279            schema_status: SchemaStatus {
1280                has_events_table: true,
1281                has_streams_table: true,
1282                has_subscriptions_table: false,
1283                is_complete: true,
1284            },
1285            performance_status: PerformanceStatus {
1286                query_latency: Duration::from_millis(10),
1287                is_performant: true,
1288            },
1289            last_check: chrono::Utc::now(),
1290        };
1291
1292        assert!(health.is_healthy);
1293        assert_eq!(health.basic_latency, Duration::from_millis(5));
1294        assert_eq!(health.pool_status.size, 20);
1295        assert!(health.schema_status.is_complete);
1296        assert!(health.performance_status.is_performant);
1297    }
1298}