Skip to main content

sayiir_postgres/
backend.rs

1//! `PostgresBackend` struct and constructors.
2
3use sayiir_core::codec::{self, Decoder, Encoder};
4use sayiir_core::snapshot::WorkflowSnapshot;
5use sayiir_persistence::BackendError;
6use sqlx::{PgPool, Row};
7
8use crate::error::PgError;
9
10/// Minimum supported PostgreSQL major version.
11const MIN_PG_MAJOR_VERSION: u32 = 13;
12
13/// PostgreSQL persistence backend for Sayiir workflows.
14///
15/// Generic over a [`Codec`](sayiir_core::codec::Codec) that determines how
16/// snapshots are serialized into the `BYTEA` column. Use `JsonCodec` for
17/// human-readable storage with Postgres-side queryability, or a binary codec
18/// for faster (de)serialization.
19///
20/// # Example (with `sayiir-runtime` JSON codec)
21///
22/// ```rust,no_run
23/// use sayiir_postgres::PostgresBackend;
24/// use sayiir_runtime::serialization::JsonCodec;
25///
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// let backend = PostgresBackend::<JsonCodec>::connect("postgresql://localhost/sayiir").await?;
28/// # Ok(())
29/// # }
30/// ```
31#[derive(Clone)]
32pub struct PostgresBackend<C> {
33    pub(crate) pool: PgPool,
34    pub(crate) codec: C,
35}
36
37impl<C> PostgresBackend<C>
38where
39    C: Default,
40{
41    /// Connect to Postgres and run migrations.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if the connection or migration fails.
46    pub async fn connect(url: &str) -> Result<Self, BackendError> {
47        let pool = PgPool::connect(url).await.map_err(PgError)?;
48        Self::init(pool).await
49    }
50
51    /// Use an existing connection pool and run migrations.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the migration fails.
56    pub async fn connect_with(pool: PgPool) -> Result<Self, BackendError> {
57        Self::init(pool).await
58    }
59
60    async fn init(pool: PgPool) -> Result<Self, BackendError> {
61        check_pg_version(&pool).await?;
62
63        tracing::info!("running postgres migrations");
64        sqlx::migrate!("./migrations")
65            .run(&pool)
66            .await
67            .map_err(|e| BackendError::Backend(format!("migration failed: {e}")))?;
68        tracing::info!("postgres backend ready");
69        Ok(Self {
70            pool,
71            codec: C::default(),
72        })
73    }
74}
75
76impl<C> PostgresBackend<C>
77where
78    C: Encoder + codec::sealed::EncodeValue<WorkflowSnapshot>,
79{
80    /// Encode a snapshot using the configured codec.
81    pub(crate) fn encode(&self, snapshot: &WorkflowSnapshot) -> Result<Vec<u8>, BackendError> {
82        self.codec
83            .encode(snapshot)
84            .map(|b| b.to_vec())
85            .map_err(|e| BackendError::Serialization(e.to_string()))
86    }
87}
88
89impl<C> PostgresBackend<C>
90where
91    C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
92{
93    /// Decode a snapshot from raw bytes using the configured codec.
94    pub(crate) fn decode(&self, data: &[u8]) -> Result<WorkflowSnapshot, BackendError> {
95        self.codec
96            .decode(bytes::Bytes::copy_from_slice(data))
97            .map_err(|e| BackendError::Serialization(e.to_string()))
98    }
99}
100
101/// Query `SHOW server_version_num` and reject versions below [`MIN_PG_MAJOR_VERSION`].
102///
103/// PostgreSQL encodes its version as a single integer: major * 10000 + minor.
104/// For example 130005 = 13.5, 170001 = 17.1.
105async fn check_pg_version(pool: &PgPool) -> Result<(), BackendError> {
106    let row = sqlx::query("SHOW server_version_num")
107        .fetch_one(pool)
108        .await
109        .map_err(PgError)?;
110
111    let version_str: &str = row.get("server_version_num");
112    let version_num: u32 = version_str.parse().map_err(|e| {
113        BackendError::Backend(format!(
114            "failed to parse server_version_num '{version_str}': {e}"
115        ))
116    })?;
117
118    let major = version_num / 10000;
119    tracing::info!(pg_version = major, "connected to PostgreSQL {major}");
120
121    if major < MIN_PG_MAJOR_VERSION {
122        return Err(BackendError::Backend(format!(
123            "PostgreSQL {major} is not supported (minimum: {MIN_PG_MAJOR_VERSION})"
124        )));
125    }
126
127    Ok(())
128}