Skip to main content

ruststream_fred/
publisher.rs

1//! Publishes messages to Redis streams via `XADD`, with optional pipelined transactions.
2
3use std::fmt::{Debug, Formatter};
4use std::sync::{Arc, Mutex};
5
6use fred::clients::Pool;
7use fred::interfaces::StreamsInterface;
8use ruststream::{OutgoingMessage, Publisher, TransactionalPublisher};
9use tokio::sync::OnceCell;
10
11use crate::{convert::fields_for_publish, error::RedisError};
12
13/// One buffered `XADD` (stream key plus its encoded entry fields), held while a transaction is open.
14type Buffered = (String, Vec<(String, Vec<u8>)>);
15
16/// Redis publisher built on a shared `fred` connection pool. Cheap to clone.
17///
18/// Holds the broker's shared connection cell, so a publisher created before the broker connects
19/// resolves the pool on first use; publishing before
20/// [`Broker::connect`](ruststream::Broker::connect) returns [`RedisError::NotConnected`].
21///
22/// [`Publisher::publish`] appends the message to the stream named by
23/// [`OutgoingMessage::name`](ruststream::OutgoingMessage::name) with `XADD <name> * ...`. The
24/// payload and headers are encoded as entry fields (see [`crate::RedisStream`] for the consuming
25/// side).
26///
27/// # Transactions
28///
29/// On standalone and sentinel topologies the publisher implements [`TransactionalPublisher`]:
30/// [`begin_transaction`](TransactionalPublisher::begin_transaction) starts buffering published
31/// messages, [`commit`](TransactionalPublisher::commit) flushes the buffer in publish order through
32/// a single `fred` pipeline, and [`abort`](TransactionalPublisher::abort) discards it. Cluster does
33/// not support it (buffered keys may live on different nodes), so `begin_transaction` returns
34/// [`RedisError::InvalidOptions`] there. Clones of a handle share the same open transaction buffer.
35#[derive(Clone)]
36pub struct RedisPublisher {
37    pool: Arc<OnceCell<Pool>>,
38    transactions_supported: bool,
39    txn: Arc<Mutex<Option<Vec<Buffered>>>>,
40}
41
42impl Debug for RedisPublisher {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("RedisPublisher")
45            .field("transactions_supported", &self.transactions_supported)
46            .finish_non_exhaustive()
47    }
48}
49
50impl RedisPublisher {
51    pub(crate) fn new(pool: Arc<OnceCell<Pool>>, transactions_supported: bool) -> Self {
52        Self {
53            pool,
54            transactions_supported,
55            txn: Arc::new(Mutex::new(None)),
56        }
57    }
58
59    pub(crate) fn pool(&self) -> Result<Pool, RedisError> {
60        self.pool.get().cloned().ok_or(RedisError::NotConnected)
61    }
62
63    /// Buffers `entry` if a transaction is open and returns `true`; otherwise leaves it for an
64    /// immediate publish.
65    fn buffer_if_in_txn(&self, entry: &Buffered) -> bool {
66        let mut guard = self.txn.lock().expect("redis publisher mutex poisoned");
67        let buffered = guard.as_mut().is_some_and(|buffer| {
68            buffer.push(entry.clone());
69            true
70        });
71        drop(guard);
72        buffered
73    }
74}
75
76impl Publisher for RedisPublisher {
77    type Error = RedisError;
78
79    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
80        let entry: Buffered = (
81            msg.name().to_owned(),
82            fields_for_publish(msg.payload(), msg.headers()),
83        );
84        if self.buffer_if_in_txn(&entry) {
85            return Ok(());
86        }
87        let pool = self.pool()?;
88        let (key, fields) = entry;
89        let _: String = pool
90            .xadd(key, false, None::<()>, "*", fields)
91            .await
92            .map_err(RedisError::publish)?;
93        Ok(())
94    }
95}
96
97impl TransactionalPublisher for RedisPublisher {
98    /// Starts buffering. A no-op if a transaction is already open (it continues).
99    ///
100    /// # Errors
101    ///
102    /// Returns [`RedisError::InvalidOptions`] on a cluster topology, which cannot offer
103    /// multi-key transactions.
104    async fn begin_transaction(&self) -> Result<(), Self::Error> {
105        if !self.transactions_supported {
106            return Err(RedisError::InvalidOptions(
107                "transactions are only supported on standalone and sentinel topologies".to_owned(),
108            ));
109        }
110        let mut guard = self.txn.lock().expect("redis publisher mutex poisoned");
111        if guard.is_none() {
112            *guard = Some(Vec::new());
113        }
114        drop(guard);
115        Ok(())
116    }
117
118    /// Flushes the buffered `XADD`s in publish order through one pipeline, then clears the
119    /// transaction. A commit with no open transaction is a no-op.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`RedisError::NotConnected`] if the broker never connected, or
124    /// [`RedisError::Publish`] if the pipeline fails. On failure the buffer is already cleared.
125    async fn commit(&self) -> Result<(), Self::Error> {
126        let buffered = self
127            .txn
128            .lock()
129            .expect("redis publisher mutex poisoned")
130            .take();
131        let Some(buffered) = buffered else {
132            return Ok(());
133        };
134        if buffered.is_empty() {
135            return Ok(());
136        }
137        let pool = self.pool()?;
138        let pipeline = pool.next().pipeline();
139        for (key, fields) in buffered {
140            let _: () = pipeline
141                .xadd(key, false, None::<()>, "*", fields)
142                .await
143                .map_err(RedisError::publish)?;
144        }
145        let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
146        Ok(())
147    }
148
149    /// Discards the buffered messages. An abort with no open transaction is a no-op.
150    async fn abort(&self) -> Result<(), Self::Error> {
151        self.txn
152            .lock()
153            .expect("redis publisher mutex poisoned")
154            .take();
155        Ok(())
156    }
157}