strev_postgres/
publisher.rs1use async_trait::async_trait;
2use serde_json::{Map, Value};
3use sqlx::PgPool;
4use strev::{CloseError, Message, Outcome, PublishError, Topic};
5
6use crate::schema::ensure_schema;
7
8pub struct PostgresPublisherConfig {
9 pub pool: PgPool,
10}
11
12impl PostgresPublisherConfig {
13 pub fn new(pool: PgPool) -> Self {
14 Self { pool }
15 }
16}
17
18pub struct PostgresPublisher {
19 pool: PgPool,
20}
21
22impl PostgresPublisher {
23 pub async fn new(config: PostgresPublisherConfig) -> Result<Self, PublishError> {
24 ensure_schema(&config.pool)
25 .await
26 .map_err(|e| PublishError::Backend(Box::new(e)))?;
27 Ok(Self { pool: config.pool })
28 }
29}
30
31#[async_trait]
32impl strev::Publisher for PostgresPublisher {
33 async fn publish(
34 &self,
35 topic: &Topic,
36 messages: Vec<Message>,
37 ) -> Result<Vec<Outcome>, PublishError> {
38 let mut outcomes = Vec::with_capacity(messages.len());
39
40 for msg in messages {
41 let metadata = metadata_to_json(&msg);
42
43 let result =
44 sqlx::query("INSERT INTO strev_messages (topic, uuid, payload, metadata) VALUES ($1, $2, $3, $4)")
45 .bind(topic.as_str())
46 .bind(msg.uuid().to_string())
47 .bind(msg.payload().as_ref())
48 .bind(Value::Object(metadata))
49 .execute(&self.pool)
50 .await;
51
52 match result {
53 Ok(_) => outcomes.push(msg.ack()),
54 Err(e) => {
55 let _ = msg.nack();
56 return Err(PublishError::Backend(Box::new(e)));
57 }
58 }
59 }
60
61 Ok(outcomes)
62 }
63
64 async fn close(&mut self) -> Result<(), CloseError> {
65 Ok(())
66 }
67}
68
69fn metadata_to_json(msg: &Message) -> Map<String, Value> {
70 let mut map = Map::new();
71 for (key, value) in msg.metadata().iter() {
72 map.insert(key.to_string(), Value::String(value.to_string()));
73 }
74 map
75}