sayiir_postgres/
backend.rs1use sayiir_core::codec::{self, Decoder, Encoder};
4use sayiir_core::snapshot::WorkflowSnapshot;
5use sayiir_persistence::BackendError;
6use sqlx::{PgPool, Row};
7
8use crate::error::PgError;
9
10const MIN_PG_MAJOR_VERSION: u32 = 13;
12
13#[derive(Clone)]
32pub struct PostgresBackend<C> {
33 pub(crate) pool: PgPool,
34 pub(crate) codec: C,
35}
36
37impl<C> PostgresBackend<C>
38where
39 C: Default,
40{
41 pub async fn connect(url: &str) -> Result<Self, BackendError> {
47 let pool = PgPool::connect(url).await.map_err(PgError)?;
48 Self::init(pool).await
49 }
50
51 pub async fn connect_with(pool: PgPool) -> Result<Self, BackendError> {
57 Self::init(pool).await
58 }
59
60 async fn init(pool: PgPool) -> Result<Self, BackendError> {
61 check_pg_version(&pool).await?;
62
63 tracing::info!("running postgres migrations");
64 sqlx::migrate!("./migrations")
65 .run(&pool)
66 .await
67 .map_err(|e| BackendError::Backend(format!("migration failed: {e}")))?;
68 tracing::info!("postgres backend ready");
69 Ok(Self {
70 pool,
71 codec: C::default(),
72 })
73 }
74}
75
76impl<C> PostgresBackend<C>
77where
78 C: Encoder + codec::sealed::EncodeValue<WorkflowSnapshot>,
79{
80 pub(crate) fn encode(&self, snapshot: &WorkflowSnapshot) -> Result<Vec<u8>, BackendError> {
82 self.codec
83 .encode(snapshot)
84 .map(|b| b.to_vec())
85 .map_err(|e| BackendError::Serialization(e.to_string()))
86 }
87}
88
89impl<C> PostgresBackend<C>
90where
91 C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
92{
93 pub(crate) fn decode(&self, data: &[u8]) -> Result<WorkflowSnapshot, BackendError> {
95 self.codec
96 .decode(bytes::Bytes::copy_from_slice(data))
97 .map_err(|e| BackendError::Serialization(e.to_string()))
98 }
99}
100
101async fn check_pg_version(pool: &PgPool) -> Result<(), BackendError> {
106 let row = sqlx::query("SHOW server_version_num")
107 .fetch_one(pool)
108 .await
109 .map_err(PgError)?;
110
111 let version_str: &str = row.get("server_version_num");
112 let version_num: u32 = version_str.parse().map_err(|e| {
113 BackendError::Backend(format!(
114 "failed to parse server_version_num '{version_str}': {e}"
115 ))
116 })?;
117
118 let major = version_num / 10000;
119 tracing::info!(pg_version = major, "connected to PostgreSQL {major}");
120
121 if major < MIN_PG_MAJOR_VERSION {
122 return Err(BackendError::Backend(format!(
123 "PostgreSQL {major} is not supported (minimum: {MIN_PG_MAJOR_VERSION})"
124 )));
125 }
126
127 Ok(())
128}