eventcore-postgres 0.1.3

PostgreSQL adapter for EventCore event sourcing library
Documentation
eventcore-postgres-0.1.3 has been yanked.

eventcore-postgres

PostgreSQL adapter for EventCore - production-ready event store with ACID guarantees.

Features

  • Multi-stream atomicity via PostgreSQL transactions
  • Optimistic concurrency control with version checking
  • Type-safe serialization - your events, not JSON blobs
  • Connection pooling with sqlx
  • Automatic retries on transient failures
  • Concurrent schema initialization with advisory locks

Installation

[dependencies]
eventcore = "0.1"
eventcore-postgres = "0.1"

Quick Start

use eventcore_postgres::{PostgresEventStore, PostgresConfig};
use eventcore::CommandExecutor;
use std::time::Duration;

// Configure with basic settings
let config = PostgresConfig::new("postgres://user:pass@localhost/eventcore");

// Or use the builder for advanced configuration
let config = PostgresConfigBuilder::new()
    .database_url("postgres://user:pass@localhost/eventcore")
    .max_connections(20)
    .min_connections(2)
    .connect_timeout(Duration::from_secs(10))
    .idle_timeout(Some(Duration::from_secs(600)))
    .max_lifetime(Some(Duration::from_secs(1800)))
    .build();

// Initialize
let store = PostgresEventStore::new(config).await?;
store.initialize().await?; // Creates schema (safe to call multiple times)

// Use with commands
let executor = CommandExecutor::new(store);

Configuration

Connection Pool Options

PostgresConfig provides comprehensive connection pool configuration:

  • max_connections (default: 20) - Maximum connections in the pool
  • min_connections (default: 2) - Minimum idle connections to maintain
  • connect_timeout (default: 10s) - Connection acquisition timeout
  • idle_timeout (default: 600s) - How long connections can remain idle
  • max_lifetime (default: 1800s) - Maximum lifetime of a connection
  • query_timeout (default: 30s) - Timeout for individual queries
  • test_before_acquire (default: false) - Test connections before use

Code Configuration

use eventcore_postgres::{PostgresConfigBuilder, PostgresEventStore};
use std::time::Duration;

// Basic configuration
let config = PostgresConfig::new("postgres://localhost/eventcore");

// Advanced configuration with builder
let config = PostgresConfigBuilder::new()
    .database_url("postgres://localhost/eventcore")
    .max_connections(30)
    .min_connections(5)
    .connect_timeout(Duration::from_secs(5))
    .idle_timeout(Some(Duration::from_secs(300)))
    .max_lifetime(Some(Duration::from_secs(1800)))
    .query_timeout(Some(Duration::from_secs(10)))
    .test_before_acquire(false)  // Skip for better performance
    .build();

// Performance-optimized preset
let config = PostgresConfigBuilder::new()
    .database_url("postgres://localhost/eventcore")
    .performance_optimized()  // Applies optimized settings
    .build();

Environment Variables

DATABASE_URL=postgres://localhost/eventcore
# Note: Connection pool settings are configured in code, not via environment variables

Schema

EventCore uses two tables with optimized indexes:

-- Event streams with version tracking
CREATE TABLE event_streams (
    stream_id VARCHAR(255) PRIMARY KEY,
    version BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Events with efficient ordering
CREATE TABLE events (
    event_id UUID PRIMARY KEY,
    stream_id VARCHAR(255) NOT NULL REFERENCES event_streams(stream_id),
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for performance
CREATE INDEX idx_events_stream_created ON events(stream_id, created_at);
CREATE INDEX idx_events_created_at ON events(created_at);

Production Considerations

Connection Pool Sizing

// For high-throughput systems
let config = PostgresConfigBuilder::new()
    .database_url(database_url)
    .max_connections(num_cpus::get() as u32 * 2)  // Rule of thumb: 2x CPU cores
    .min_connections(num_cpus::get() as u32)      // Keep connections warm
    .idle_timeout(Some(Duration::from_secs(300))) // 5 minutes
    .max_lifetime(Some(Duration::from_secs(3600))) // 1 hour
    .build();

// For bursty workloads
let config = PostgresConfigBuilder::new()
    .database_url(database_url)
    .max_connections(50)                          // Higher max for bursts
    .min_connections(5)                           // Lower minimum
    .connect_timeout(Duration::from_secs(2))      // Fast timeout
    .idle_timeout(Some(Duration::from_secs(60)))  // Aggressive cleanup
    .build();

// For long-running operations
let config = PostgresConfigBuilder::new()
    .database_url(database_url)
    .max_connections(15)
    .query_timeout(Some(Duration::from_secs(300))) // 5 minute queries
    .max_lifetime(None)                            // No lifetime limit
    .build();

Transaction Isolation

EventCore uses SERIALIZABLE isolation for multi-stream writes, ensuring:

  • No phantom reads
  • No write skew
  • Full ACID compliance

Performance Tuning

-- Increase shared_buffers for better caching
ALTER SYSTEM SET shared_buffers = '256MB';

-- Enable parallel queries
ALTER SYSTEM SET max_parallel_workers_per_gather = 4;

-- Tune for SSDs
ALTER SYSTEM SET random_page_cost = 1.1;

For information about how EventCore leverages PostgreSQL prepared statements for optimal performance, see Prepared Statement Performance.

Monitoring

The adapter provides comprehensive connection pool monitoring:

// Get current pool metrics
let metrics = store.get_pool_metrics();
println!("Active connections: {}", metrics.active_connections);
println!("Idle connections: {}", metrics.idle_connections);
println!("Pool utilization: {:.1}%", metrics.utilization_percent);

// Start background monitoring
let (monitor_task, stop_tx) = store.start_pool_monitoring();

// Access detailed health information
let health = store.health_check().await?;
println!("Pool healthy: {}", health.pool_status.is_closed == false);
println!("Query latency: {:?}", health.performance_status.query_latency);

// Enable detailed tracing
tracing_subscriber::fmt()
    .with_env_filter("eventcore_postgres=debug")
    .init();

Key metrics to monitor:

  • current_connections - Active connections in pool
  • idle_connections - Available connections
  • utilization_percent - Pool usage (0-100%)
  • connection_timeouts - Failed connection attempts
  • avg_acquisition_time - Time to get a connection
  • peak_connections - Historical maximum

Testing

For integration tests, use Docker:

# docker-compose.yml
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: eventcore_test
      POSTGRES_PASSWORD: postgres
    ports:
      - "5433:5432"
#[tokio::test]
async fn test_with_postgres() {
    let config = PostgresConfig::test_default(); // Uses TEST_DATABASE_URL
    let store = PostgresEventStore::new(config).await.unwrap();
    store.initialize().await.unwrap();
    
    // Run your tests...
}

Migration from Other Event Stores

From EventStoreDB

// EventCore handles projections differently
// Instead of catch-up subscriptions, use:
let projection = MyProjection::new();
let runner = ProjectionRunner::new(store, projection);
runner.run_continuous().await?;

From Axon

// No more aggregate classes!
// Commands handle their own state:
impl Command for MyCommand {
    fn apply(&self, state: &mut State, event: &StoredEvent<Event>) {
        // Direct state manipulation
    }
}

Troubleshooting

"Version conflict" errors

This is optimistic concurrency control working correctly. EventCore will automatically retry with exponential backoff.

"Connection pool timeout"

This indicates all connections are busy. Solutions:

// 1. Increase pool size
let config = PostgresConfigBuilder::new()
    .database_url(database_url)
    .max_connections(30)  // Increase from default 20
    .connect_timeout(Duration::from_secs(10))  // Give more time
    .build();

// 2. Check pool health
let metrics = store.get_pool_metrics();
if metrics.utilization_percent > 80.0 {
    warn!("Pool utilization high: {:.1}%", metrics.utilization_percent);
}

// 3. Enable connection recovery
let config = PostgresConfig {
    enable_recovery: true,  // Automatic recovery on failures
    max_retries: 5,        // More retry attempts
    ..PostgresConfig::new(database_url)
};

Schema initialization hangs

Another process might be initializing. This is normal - EventCore uses advisory locks to prevent race conditions.

See Also