Skip to main content

strev_postgres/
publisher.rs

1use async_trait::async_trait;
2use serde_json::{Map, Value};
3use sqlx::PgPool;
4use strev::{CloseError, Message, Outcome, PublishError, Topic};
5
6use crate::schema::ensure_schema;
7
8pub struct PostgresPublisherConfig {
9    pub pool: PgPool,
10}
11
12impl PostgresPublisherConfig {
13    pub fn new(pool: PgPool) -> Self {
14        Self { pool }
15    }
16}
17
18pub struct PostgresPublisher {
19    pool: PgPool,
20}
21
22impl PostgresPublisher {
23    pub async fn new(config: PostgresPublisherConfig) -> Result<Self, PublishError> {
24        ensure_schema(&config.pool)
25            .await
26            .map_err(|e| PublishError::Backend(Box::new(e)))?;
27        Ok(Self { pool: config.pool })
28    }
29}
30
31#[async_trait]
32impl strev::Publisher for PostgresPublisher {
33    async fn publish(
34        &self,
35        topic: &Topic,
36        messages: Vec<Message>,
37    ) -> Result<Vec<Outcome>, PublishError> {
38        let mut outcomes = Vec::with_capacity(messages.len());
39
40        for msg in messages {
41            let metadata = metadata_to_json(&msg);
42
43            let result =
44                sqlx::query("INSERT INTO strev_messages (topic, uuid, payload, metadata) VALUES ($1, $2, $3, $4)")
45                    .bind(topic.as_str())
46                    .bind(msg.uuid().to_string())
47                    .bind(msg.payload().as_ref())
48                    .bind(Value::Object(metadata))
49                    .execute(&self.pool)
50                    .await;
51
52            match result {
53                Ok(_) => outcomes.push(msg.ack()),
54                Err(e) => {
55                    let _ = msg.nack();
56                    return Err(PublishError::Backend(Box::new(e)));
57                }
58            }
59        }
60
61        Ok(outcomes)
62    }
63
64    async fn close(&mut self) -> Result<(), CloseError> {
65        Ok(())
66    }
67}
68
69fn metadata_to_json(msg: &Message) -> Map<String, Value> {
70    let mut map = Map::new();
71    for (key, value) in msg.metadata().iter() {
72        map.insert(key.to_string(), Value::String(value.to_string()));
73    }
74    map
75}