graphile_worker_database 0.1.5

Database driver abstraction for graphile_worker
Documentation
use futures::StreamExt;
use sqlx::PgPool;

use super::transaction::SqlxTransaction;
use crate::{Database, DatabaseDriver, DbError, DbTransaction, Notification, NotificationStream};

#[derive(Clone, Debug)]
pub struct SqlxDatabase {
    pub(super) pool: PgPool,
}

impl SqlxDatabase {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }

    pub fn pool(&self) -> &PgPool {
        &self.pool
    }
}

impl From<PgPool> for SqlxDatabase {
    fn from(pool: PgPool) -> Self {
        Self::new(pool)
    }
}

impl From<PgPool> for Database {
    fn from(pool: PgPool) -> Self {
        Database::new(SqlxDatabase::new(pool))
    }
}

impl From<&PgPool> for Database {
    fn from(pool: &PgPool) -> Self {
        Database::new(SqlxDatabase::new(pool.clone()))
    }
}

impl From<SqlxDatabase> for Database {
    fn from(database: SqlxDatabase) -> Self {
        Database::new(database)
    }
}

impl From<sqlx::Error> for DbError {
    fn from(error: sqlx::Error) -> Self {
        if let sqlx::Error::Database(database_error) = &error {
            if let Some(code) = database_error.code() {
                return DbError::with_code(error.to_string(), code);
            }
        }

        DbError::new(error.to_string())
    }
}

impl DatabaseDriver for SqlxDatabase {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn begin<'a>(&'a self) -> crate::BoxFuture<'a, Result<DbTransaction, DbError>> {
        Box::pin(async move {
            let tx = self.pool.begin().await?;
            Ok(DbTransaction::new(Box::new(SqlxTransaction::new(tx))))
        })
    }

    fn listen<'a>(
        &'a self,
        channel: &'a str,
    ) -> crate::BoxFuture<'a, Result<Option<NotificationStream>, DbError>> {
        Box::pin(async move {
            let mut listener = sqlx::postgres::PgListener::connect_with(&self.pool).await?;
            listener.listen(channel).await?;
            let stream = listener.into_stream().map(|result| {
                result
                    .map(|notification| Notification {
                        channel: notification.channel().to_string(),
                        payload: notification.payload().to_string(),
                    })
                    .map_err(Into::into)
            });
            Ok(Some(Box::pin(stream) as NotificationStream))
        })
    }
}