strev-postgres 0.6.0

PostgreSQL backend for strev
Documentation
use async_trait::async_trait;
use serde_json::{Map, Value};
use sqlx::PgPool;
use strev::{CloseError, Message, Outcome, PublishError, Topic};

use crate::schema::ensure_schema;

pub struct PostgresPublisherConfig {
    pub pool: PgPool,
}

impl PostgresPublisherConfig {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
}

pub struct PostgresPublisher {
    pool: PgPool,
}

impl PostgresPublisher {
    pub async fn new(config: PostgresPublisherConfig) -> Result<Self, PublishError> {
        ensure_schema(&config.pool)
            .await
            .map_err(|e| PublishError::Backend(Box::new(e)))?;
        Ok(Self { pool: config.pool })
    }
}

#[async_trait]
impl strev::Publisher for PostgresPublisher {
    async fn publish(
        &self,
        topic: &Topic,
        messages: Vec<Message>,
    ) -> Result<Vec<Outcome>, PublishError> {
        let mut outcomes = Vec::with_capacity(messages.len());

        for msg in messages {
            let metadata = metadata_to_json(&msg);

            let result =
                sqlx::query("INSERT INTO strev_messages (topic, uuid, payload, metadata) VALUES ($1, $2, $3, $4)")
                    .bind(topic.as_str())
                    .bind(msg.uuid().to_string())
                    .bind(msg.payload().as_ref())
                    .bind(Value::Object(metadata))
                    .execute(&self.pool)
                    .await;

            match result {
                Ok(_) => outcomes.push(msg.ack()),
                Err(e) => {
                    let _ = msg.nack();
                    return Err(PublishError::Backend(Box::new(e)));
                }
            }
        }

        Ok(outcomes)
    }

    async fn close(&mut self) -> Result<(), CloseError> {
        Ok(())
    }
}

fn metadata_to_json(msg: &Message) -> Map<String, Value> {
    let mut map = Map::new();
    for (key, value) in msg.metadata().iter() {
        map.insert(key.to_string(), Value::String(value.to_string()));
    }
    map
}