Skip to main content

sayiir_postgres/
backend.rs

1//! `PostgresBackend` struct and constructors.
2
3use sayiir_core::codec::{self, Decoder, Encoder};
4use sayiir_core::snapshot::WorkflowSnapshot;
5use sayiir_persistence::BackendError;
6use sqlx::{PgPool, Row};
7
8use crate::error::PgError;
9
10/// Minimum supported PostgreSQL major version.
11const MIN_PG_MAJOR_VERSION: u32 = 13;
12
13/// PostgreSQL persistence backend for Sayiir workflows.
14///
15/// Generic over a [`Codec`](sayiir_core::codec::Codec) that determines how
16/// snapshots are serialized into the `BYTEA` column. Use `JsonCodec` for
17/// human-readable storage with Postgres-side queryability, or a binary codec
18/// for faster (de)serialization.
19///
20/// # Example (with `sayiir-runtime` JSON codec)
21///
22/// ```rust,no_run
23/// use sayiir_postgres::PostgresBackend;
24/// use sayiir_runtime::serialization::JsonCodec;
25///
26/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
27/// let backend = PostgresBackend::<JsonCodec>::connect("postgresql://localhost/sayiir").await?;
28/// # Ok(())
29/// # }
30/// ```
31pub struct PostgresBackend<C> {
32    pub(crate) pool: PgPool,
33    pub(crate) codec: C,
34}
35
36impl<C> PostgresBackend<C>
37where
38    C: Default,
39{
40    /// Connect to Postgres and run migrations.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the connection or migration fails.
45    pub async fn connect(url: &str) -> Result<Self, BackendError> {
46        let pool = PgPool::connect(url).await.map_err(PgError)?;
47        Self::init(pool).await
48    }
49
50    /// Use an existing connection pool and run migrations.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the migration fails.
55    pub async fn connect_with(pool: PgPool) -> Result<Self, BackendError> {
56        Self::init(pool).await
57    }
58
59    async fn init(pool: PgPool) -> Result<Self, BackendError> {
60        check_pg_version(&pool).await?;
61
62        tracing::info!("running postgres migrations");
63        sqlx::migrate!("./migrations")
64            .run(&pool)
65            .await
66            .map_err(|e| BackendError::Backend(format!("migration failed: {e}")))?;
67        tracing::info!("postgres backend ready");
68        Ok(Self {
69            pool,
70            codec: C::default(),
71        })
72    }
73}
74
75impl<C> PostgresBackend<C>
76where
77    C: Encoder + codec::sealed::EncodeValue<WorkflowSnapshot>,
78{
79    /// Encode a snapshot using the configured codec.
80    pub(crate) fn encode(&self, snapshot: &WorkflowSnapshot) -> Result<Vec<u8>, BackendError> {
81        self.codec
82            .encode(snapshot)
83            .map(|b| b.to_vec())
84            .map_err(|e| BackendError::Serialization(e.to_string()))
85    }
86}
87
88impl<C> PostgresBackend<C>
89where
90    C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
91{
92    /// Decode a snapshot from raw bytes using the configured codec.
93    pub(crate) fn decode(&self, data: &[u8]) -> Result<WorkflowSnapshot, BackendError> {
94        self.codec
95            .decode(bytes::Bytes::copy_from_slice(data))
96            .map_err(|e| BackendError::Serialization(e.to_string()))
97    }
98}
99
100/// Query `SHOW server_version_num` and reject versions below [`MIN_PG_MAJOR_VERSION`].
101///
102/// PostgreSQL encodes its version as a single integer: major * 10000 + minor.
103/// For example 130005 = 13.5, 170001 = 17.1.
104async fn check_pg_version(pool: &PgPool) -> Result<(), BackendError> {
105    let row = sqlx::query("SHOW server_version_num")
106        .fetch_one(pool)
107        .await
108        .map_err(PgError)?;
109
110    let version_str: &str = row.get("server_version_num");
111    let version_num: u32 = version_str.parse().map_err(|e| {
112        BackendError::Backend(format!(
113            "failed to parse server_version_num '{version_str}': {e}"
114        ))
115    })?;
116
117    let major = version_num / 10000;
118    tracing::info!(pg_version = major, "connected to PostgreSQL {major}");
119
120    if major < MIN_PG_MAJOR_VERSION {
121        return Err(BackendError::Backend(format!(
122            "PostgreSQL {major} is not supported (minimum: {MIN_PG_MAJOR_VERSION})"
123        )));
124    }
125
126    Ok(())
127}