sayiir_postgres/
backend.rs1use sayiir_core::codec::{self, Decoder, Encoder};
4use sayiir_core::snapshot::WorkflowSnapshot;
5use sayiir_persistence::BackendError;
6use sqlx::{PgPool, Row};
7
8use crate::error::PgError;
9
10const MIN_PG_MAJOR_VERSION: u32 = 13;
12
13pub struct PostgresBackend<C> {
32 pub(crate) pool: PgPool,
33 pub(crate) codec: C,
34}
35
36impl<C> PostgresBackend<C>
37where
38 C: Default,
39{
40 pub async fn connect(url: &str) -> Result<Self, BackendError> {
46 let pool = PgPool::connect(url).await.map_err(PgError)?;
47 Self::init(pool).await
48 }
49
50 pub async fn connect_with(pool: PgPool) -> Result<Self, BackendError> {
56 Self::init(pool).await
57 }
58
59 async fn init(pool: PgPool) -> Result<Self, BackendError> {
60 check_pg_version(&pool).await?;
61
62 tracing::info!("running postgres migrations");
63 sqlx::migrate!("./migrations")
64 .run(&pool)
65 .await
66 .map_err(|e| BackendError::Backend(format!("migration failed: {e}")))?;
67 tracing::info!("postgres backend ready");
68 Ok(Self {
69 pool,
70 codec: C::default(),
71 })
72 }
73}
74
75impl<C> PostgresBackend<C>
76where
77 C: Encoder + codec::sealed::EncodeValue<WorkflowSnapshot>,
78{
79 pub(crate) fn encode(&self, snapshot: &WorkflowSnapshot) -> Result<Vec<u8>, BackendError> {
81 self.codec
82 .encode(snapshot)
83 .map(|b| b.to_vec())
84 .map_err(|e| BackendError::Serialization(e.to_string()))
85 }
86}
87
88impl<C> PostgresBackend<C>
89where
90 C: Decoder + codec::sealed::DecodeValue<WorkflowSnapshot>,
91{
92 pub(crate) fn decode(&self, data: &[u8]) -> Result<WorkflowSnapshot, BackendError> {
94 self.codec
95 .decode(bytes::Bytes::copy_from_slice(data))
96 .map_err(|e| BackendError::Serialization(e.to_string()))
97 }
98}
99
100async fn check_pg_version(pool: &PgPool) -> Result<(), BackendError> {
105 let row = sqlx::query("SHOW server_version_num")
106 .fetch_one(pool)
107 .await
108 .map_err(PgError)?;
109
110 let version_str: &str = row.get("server_version_num");
111 let version_num: u32 = version_str.parse().map_err(|e| {
112 BackendError::Backend(format!(
113 "failed to parse server_version_num '{version_str}': {e}"
114 ))
115 })?;
116
117 let major = version_num / 10000;
118 tracing::info!(pg_version = major, "connected to PostgreSQL {major}");
119
120 if major < MIN_PG_MAJOR_VERSION {
121 return Err(BackendError::Backend(format!(
122 "PostgreSQL {major} is not supported (minimum: {MIN_PG_MAJOR_VERSION})"
123 )));
124 }
125
126 Ok(())
127}