eventcore_postgres/
lib.rs

1use std::time::Duration;
2
3use eventcore_types::{
4    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5    EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry,
6    StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18    #[error("failed to create postgres connection pool")]
19    ConnectionFailed(#[source] sqlx::Error),
20}
21
22/// Maximum number of database connections in the pool.
23///
24/// MaxConnections represents the connection pool size limit. It must be at least 1,
25/// enforced by using NonZeroU32 as the underlying type.
26///
27/// # Examples
28///
29/// ```ignore
30/// use eventcore_postgres::MaxConnections;
31/// use std::num::NonZeroU32;
32///
33/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
34/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
35/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
36///
37/// // Zero connections not allowed by type system
38/// // let zero = NonZeroU32::new(0); // Returns None
39/// ```
40#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43/// Configuration for PostgresEventStore connection pool.
44#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46    /// Maximum number of connections in the pool (default: 10)
47    pub max_connections: MaxConnections,
48    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
49    pub acquire_timeout: Duration,
50    /// Idle timeout for connections in the pool (default: 10 minutes)
51    pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55    fn default() -> Self {
56        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57            Some(v) => v,
58            None => unreachable!(),
59        };
60
61        Self {
62            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63            acquire_timeout: Duration::from_secs(30),
64            idle_timeout: Duration::from_secs(600), // 10 minutes
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71    pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75    /// Create a new PostgresEventStore with default configuration.
76    pub async fn new<S: Into<String>>(
77        connection_string: S,
78    ) -> Result<Self, PostgresEventStoreError> {
79        Self::with_config(connection_string, PostgresConfig::default()).await
80    }
81
82    /// Create a new PostgresEventStore with custom configuration.
83    pub async fn with_config<S: Into<String>>(
84        connection_string: S,
85        config: PostgresConfig,
86    ) -> Result<Self, PostgresEventStoreError> {
87        let connection_string = connection_string.into();
88        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89        let pool = PgPoolOptions::new()
90            .max_connections(max_connections.get())
91            .acquire_timeout(config.acquire_timeout)
92            .idle_timeout(config.idle_timeout)
93            .connect(&connection_string)
94            .await
95            .map_err(PostgresEventStoreError::ConnectionFailed)?;
96        Ok(Self { pool })
97    }
98
99    /// Create a PostgresEventStore from an existing connection pool.
100    ///
101    /// Use this when you need full control over pool configuration or want to
102    /// share a pool across multiple components.
103    pub fn from_pool(pool: Pool<Postgres>) -> Self {
104        Self { pool }
105    }
106
107    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
108    pub async fn ping(&self) {
109        query("SELECT 1")
110            .execute(&self.pool)
111            .await
112            .expect("postgres ping failed");
113    }
114
115    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
116    pub async fn migrate(&self) {
117        sqlx::migrate!("./migrations")
118            .run(&self.pool)
119            .await
120            .expect("postgres migration failed");
121    }
122}
123
124impl EventStore for PostgresEventStore {
125    #[instrument(name = "postgres.read_stream", skip(self))]
126    async fn read_stream<E: Event>(
127        &self,
128        stream_id: StreamId,
129    ) -> Result<EventStreamReader<E>, EventStoreError> {
130        info!(
131            stream = %stream_id,
132            "[postgres.read_stream] reading events from postgres"
133        );
134
135        let rows = query(
136            "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137        )
138        .bind(stream_id.as_ref())
139        .fetch_all(&self.pool)
140        .await
141        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143        let mut events = Vec::with_capacity(rows.len());
144        for row in rows {
145            let payload: Value = row
146                .try_get("event_data")
147                .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148            let event = serde_json::from_value(payload).map_err(|error| {
149                EventStoreError::DeserializationFailed {
150                    stream_id: stream_id.clone(),
151                    detail: error.to_string(),
152                }
153            })?;
154            events.push(event);
155        }
156
157        Ok(EventStreamReader::new(events))
158    }
159
160    #[instrument(name = "postgres.append_events", skip(self, writes))]
161    async fn append_events(
162        &self,
163        writes: StreamWrites,
164    ) -> Result<EventStreamSlice, EventStoreError> {
165        let expected_versions = writes.expected_versions().clone();
166        let entries = writes.into_entries();
167
168        if entries.is_empty() {
169            return Ok(EventStreamSlice);
170        }
171
172        info!(
173            stream_count = expected_versions.len(),
174            event_count = entries.len(),
175            "[postgres.append_events] appending events to postgres"
176        );
177
178        // Build expected versions JSON for the trigger
179        let expected_versions_json: Value = expected_versions
180            .iter()
181            .map(|(stream_id, version)| {
182                (stream_id.as_ref().to_string(), json!(version.into_inner()))
183            })
184            .collect();
185
186        let mut tx = self
187            .pool
188            .begin()
189            .await
190            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192        // Set expected versions in session config for trigger validation
193        query("SELECT set_config('eventcore.expected_versions', $1, true)")
194            .bind(expected_versions_json.to_string())
195            .execute(&mut *tx)
196            .await
197            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199        // Insert all events - trigger handles version assignment and validation
200        for entry in entries {
201            let StreamWriteEntry {
202                stream_id,
203                event_type,
204                event_data,
205                ..
206            } = entry;
207
208            let event_id = Uuid::now_v7();
209            query(
210                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211                 VALUES ($1, $2, $3, $4, $5)",
212            )
213            .bind(event_id)
214            .bind(stream_id.as_ref())
215            .bind(event_type)
216            .bind(Json(event_data))
217            .bind(Json(json!({})))
218            .execute(&mut *tx)
219            .await
220            .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221        }
222
223        tx.commit()
224            .await
225            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227        Ok(EventStreamSlice)
228    }
229}
230
231impl EventReader for PostgresEventStore {
232    type Error = EventStoreError;
233
234    async fn read_events<E: Event>(
235        &self,
236        filter: EventFilter,
237        page: EventPage,
238    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
239        // Query events ordered by event_id (UUID7, monotonically increasing).
240        // Use event_id directly as the global position - no need for ROW_NUMBER.
241        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
242        let limit: i64 = page.limit().into_inner() as i64;
243
244        let rows = if let Some(prefix) = filter.stream_prefix() {
245            let prefix_str = prefix.as_ref();
246
247            if let Some(after_id) = after_event_id {
248                let query_str = r#"
249                    SELECT event_id, event_data, stream_id
250                    FROM eventcore_events
251                    WHERE event_id > $1
252                      AND stream_id LIKE $2 || '%'
253                    ORDER BY event_id
254                    LIMIT $3
255                "#;
256                query(query_str)
257                    .bind(after_id)
258                    .bind(prefix_str)
259                    .bind(limit)
260                    .fetch_all(&self.pool)
261                    .await
262            } else {
263                let query_str = r#"
264                    SELECT event_id, event_data, stream_id
265                    FROM eventcore_events
266                    WHERE stream_id LIKE $1 || '%'
267                    ORDER BY event_id
268                    LIMIT $2
269                "#;
270                query(query_str)
271                    .bind(prefix_str)
272                    .bind(limit)
273                    .fetch_all(&self.pool)
274                    .await
275            }
276        } else if let Some(after_id) = after_event_id {
277            let query_str = r#"
278                SELECT event_id, event_data, stream_id
279                FROM eventcore_events
280                WHERE event_id > $1
281                ORDER BY event_id
282                LIMIT $2
283            "#;
284            query(query_str)
285                .bind(after_id)
286                .bind(limit)
287                .fetch_all(&self.pool)
288                .await
289        } else {
290            let query_str = r#"
291                SELECT event_id, event_data, stream_id
292                FROM eventcore_events
293                ORDER BY event_id
294                LIMIT $1
295            "#;
296            query(query_str).bind(limit).fetch_all(&self.pool).await
297        }
298        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
299
300        let events: Vec<(E, StreamPosition)> = rows
301            .into_iter()
302            .filter_map(|row| {
303                let event_data: Json<Value> = row.get("event_data");
304                let event_id: Uuid = row.get("event_id");
305                serde_json::from_value::<E>(event_data.0)
306                    .ok()
307                    .map(|e| (e, StreamPosition::new(event_id)))
308            })
309            .collect();
310
311        Ok(events)
312    }
313}
314
315fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
316    if let sqlx::Error::Database(db_error) = &error {
317        let code = db_error.code();
318        let code_str = code.as_deref();
319        // P0001: Custom error from trigger (version_conflict)
320        // 23505: Unique constraint violation (fallback for version conflict)
321        if code_str == Some("P0001") || code_str == Some("23505") {
322            warn!(
323                error = %db_error,
324                "[postgres.version_conflict] optimistic concurrency check failed"
325            );
326            return EventStoreError::VersionConflict;
327        }
328    }
329
330    error!(
331        error = %error,
332        operation = %operation,
333        "[postgres.database_error] database operation failed"
334    );
335    EventStoreError::StoreFailure { operation }
336}
337
338/// Error type for PostgresCheckpointStore operations.
339#[derive(Debug, Error)]
340pub enum PostgresCheckpointError {
341    /// Failed to create connection pool.
342    #[error("failed to create postgres connection pool")]
343    ConnectionFailed(#[source] sqlx::Error),
344
345    /// Database operation failed.
346    #[error("database operation failed: {0}")]
347    DatabaseError(#[source] sqlx::Error),
348}
349
350/// Postgres-backed checkpoint store for tracking projection progress.
351///
352/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
353/// providing durability across process restarts. It implements the `CheckpointStore`
354/// trait from eventcore-types.
355///
356/// # Schema
357///
358/// The store uses the `eventcore_subscription_versions` table with:
359/// - `subscription_name`: Unique identifier for each projector/subscription
360/// - `last_position`: UUID7 representing the global stream position
361/// - `updated_at`: Timestamp of the last checkpoint update
362#[derive(Debug, Clone)]
363pub struct PostgresCheckpointStore {
364    pool: Pool<Postgres>,
365}
366
367impl PostgresCheckpointStore {
368    /// Create a new PostgresCheckpointStore with default configuration.
369    pub async fn new<S: Into<String>>(
370        connection_string: S,
371    ) -> Result<Self, PostgresCheckpointError> {
372        Self::with_config(connection_string, PostgresConfig::default()).await
373    }
374
375    /// Create a new PostgresCheckpointStore with custom configuration.
376    pub async fn with_config<S: Into<String>>(
377        connection_string: S,
378        config: PostgresConfig,
379    ) -> Result<Self, PostgresCheckpointError> {
380        let connection_string = connection_string.into();
381        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
382        let pool = PgPoolOptions::new()
383            .max_connections(max_connections.get())
384            .acquire_timeout(config.acquire_timeout)
385            .idle_timeout(config.idle_timeout)
386            .connect(&connection_string)
387            .await
388            .map_err(PostgresCheckpointError::ConnectionFailed)?;
389
390        // Run migrations to ensure table exists
391        sqlx::migrate!("./migrations")
392            .run(&pool)
393            .await
394            .map_err(|e| {
395                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
396            })?;
397
398        Ok(Self { pool })
399    }
400
401    /// Create a PostgresCheckpointStore from an existing connection pool.
402    ///
403    /// Use this when you need full control over pool configuration or want to
404    /// share a pool across multiple components.
405    pub fn from_pool(pool: Pool<Postgres>) -> Self {
406        Self { pool }
407    }
408}
409
410impl CheckpointStore for PostgresCheckpointStore {
411    type Error = PostgresCheckpointError;
412
413    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
414        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
415            .bind(name)
416            .fetch_optional(&self.pool)
417            .await
418            .map_err(PostgresCheckpointError::DatabaseError)?;
419
420        match row {
421            Some(row) => {
422                let position: Uuid = row.get("last_position");
423                Ok(Some(StreamPosition::new(position)))
424            }
425            None => Ok(None),
426        }
427    }
428
429    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
430        let position_uuid: Uuid = position.into_inner();
431        query(
432            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
433             VALUES ($1, $2, NOW())
434             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
435        )
436        .bind(name)
437        .bind(position_uuid)
438        .execute(&self.pool)
439        .await
440        .map_err(PostgresCheckpointError::DatabaseError)?;
441
442        Ok(())
443    }
444}