Skip to main content

ruststream_fred/
pubsub.rs

1//! Redis Pub/Sub transport: fire-and-forget fan-out with no acknowledgement.
2//!
3//! Unlike Streams, Pub/Sub has no durability, no consumer groups, and no ack: a message reaches
4//! whichever subscribers are connected at publish time, and `ack` / `nack` report
5//! [`AckError::Unsupported`]. Two delivery modes exist, explicit because they do not interoperate:
6//!
7//! * [`PubSubMode::Classic`] - `SUBSCRIBE` / `PUBLISH`, broadcast to every node; supports patterns
8//!   (`PSUBSCRIBE`). The only option on standalone and sentinel.
9//! * [`PubSubMode::Sharded`] - `SSUBSCRIBE` / `SPUBLISH` (Redis 7+), slot-local so it scales across
10//!   a cluster, but has no pattern support.
11//!
12//! Headers travel in a frame around the payload (see [`crate::envelope`]): a lossless binary frame
13//! by default, or a readable codec-serialized envelope when a codec is set with
14//! [`RedisPubSub::codec`] / [`RedisPubSubPublisher::codec`].
15
16use std::fmt::{Debug, Formatter};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use fred::clients::Client;
21use fred::interfaces::{ClientLike, PubsubInterface};
22use fred::types::Message;
23use futures::Stream;
24use futures::stream::unfold;
25use ruststream::codec::Codec;
26use ruststream::{
27    AckError, Headers, IncomingMessage, OutgoingMessage, Partitioned, Publisher, SubscriptionSource,
28};
29use tokio::sync::OnceCell;
30use tokio::sync::broadcast::{Receiver, error::RecvError};
31
32use crate::envelope::{SharedEnvelope, frame, unframe};
33use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
34
35/// Pub/Sub delivery mode. Defaults to [`Classic`](Self::Classic).
36#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
37pub enum PubSubMode {
38    /// `SUBSCRIBE` / `PUBLISH`: cluster-wide broadcast, pattern-capable, does not scale by slot.
39    #[default]
40    Classic,
41    /// `SSUBSCRIBE` / `SPUBLISH` (Redis 7+): slot-local sharded delivery, no patterns.
42    Sharded,
43}
44
45/// Describes one Pub/Sub subscription against [`crate::RedisBroker`].
46///
47/// # Examples
48///
49/// ```
50/// use ruststream_fred::{PubSubMode, RedisPubSub};
51///
52/// let classic = RedisPubSub::new("events");
53/// let sharded = RedisPubSub::new("events").mode(PubSubMode::Sharded);
54/// let pattern = RedisPubSub::new("events.*").pattern(); // classic only
55/// # let _ = (classic, sharded, pattern);
56/// ```
57#[derive(Clone)]
58#[must_use]
59pub struct RedisPubSub {
60    channel: String,
61    mode: PubSubMode,
62    pattern: bool,
63    codec: Option<SharedEnvelope>,
64}
65
66impl Debug for RedisPubSub {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("RedisPubSub")
69            .field("channel", &self.channel)
70            .field("mode", &self.mode)
71            .field("pattern", &self.pattern)
72            .field("codec", &self.codec.is_some())
73            .finish()
74    }
75}
76
77impl RedisPubSub {
78    /// A subscription on `channel` (an exact channel by default; see [`pattern`](Self::pattern)).
79    pub fn new(channel: impl Into<String>) -> Self {
80        Self {
81            channel: channel.into(),
82            mode: PubSubMode::default(),
83            pattern: false,
84            codec: None,
85        }
86    }
87
88    /// Sets the delivery mode. Defaults to [`PubSubMode::Classic`].
89    pub const fn mode(mut self, mode: PubSubMode) -> Self {
90        self.mode = mode;
91        self
92    }
93
94    /// Treats the channel as a glob pattern (`PSUBSCRIBE`). Classic mode only; combining it with
95    /// [`PubSubMode::Sharded`] is rejected at subscribe time.
96    pub const fn pattern(mut self) -> Self {
97        self.pattern = true;
98        self
99    }
100
101    /// Decodes the header/payload envelope with `codec` (must match the publisher). Without it the
102    /// default lossless binary framing is used.
103    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
104        self.codec = Some(Arc::new(codec));
105        self
106    }
107
108    /// The channel (or pattern) this subscription listens on.
109    #[must_use]
110    pub fn channel(&self) -> &str {
111        &self.channel
112    }
113
114    pub(crate) const fn delivery_mode(&self) -> PubSubMode {
115        self.mode
116    }
117
118    pub(crate) const fn is_pattern(&self) -> bool {
119        self.pattern
120    }
121
122    pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
123        self.codec.clone()
124    }
125
126    pub(crate) fn validate(&self) -> Result<(), RedisError> {
127        if self.pattern && matches!(self.mode, PubSubMode::Sharded) {
128            return Err(RedisError::InvalidOptions(
129                "pattern subscriptions are classic-only; sharded pub/sub has no PSUBSCRIBE"
130                    .to_owned(),
131            ));
132        }
133        Ok(())
134    }
135}
136
137impl SubscriptionSource<RedisBroker> for RedisPubSub {
138    type Subscriber = RedisPubSubSubscriber;
139
140    fn name(&self) -> &str {
141        self.channel()
142    }
143
144    async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
145        broker.subscribe_pubsub(self).await
146    }
147}
148
149/// A Pub/Sub subscription backed by a dedicated `fred` client, so its message stream and channel
150/// state are isolated from other subscribers and from the publishing pool.
151pub struct RedisPubSubSubscriber {
152    client: Client,
153    rx: Receiver<Message>,
154    codec: Option<SharedEnvelope>,
155}
156
157impl Debug for RedisPubSubSubscriber {
158    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("RedisPubSubSubscriber")
160            .finish_non_exhaustive()
161    }
162}
163
164impl RedisPubSubSubscriber {
165    pub(crate) fn new(
166        client: Client,
167        rx: Receiver<Message>,
168        codec: Option<SharedEnvelope>,
169    ) -> Self {
170        Self { client, rx, codec }
171    }
172}
173
174impl Drop for RedisPubSubSubscriber {
175    fn drop(&mut self) {
176        // The dedicated client owns a background connection task; close it on a detached task since
177        // `drop` cannot await.
178        let client = self.client.clone();
179        tokio::spawn(async move {
180            let _ = client.quit().await;
181        });
182    }
183}
184
185fn to_message(msg: &Message, codec: Option<&SharedEnvelope>) -> RedisPubSubMessage {
186    let raw = msg.value.as_bytes().unwrap_or(&[]);
187    let (payload, headers) = unframe(codec, raw);
188    RedisPubSubMessage {
189        channel: msg.channel.to_string(),
190        payload,
191        headers,
192    }
193}
194
195impl ruststream::Subscriber for RedisPubSubSubscriber {
196    type Message = RedisPubSubMessage;
197    type Error = RedisError;
198
199    /// Yields one message per Pub/Sub delivery.
200    ///
201    /// # Cancel safety
202    ///
203    /// Dropping the returned stream between items is safe. Because Pub/Sub has no buffering, any
204    /// message published while no stream is polling is lost (this is Redis Pub/Sub semantics, not a
205    /// limitation of this client).
206    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
207        let codec = self.codec.clone();
208        unfold((&mut self.rx, codec), |(rx, codec)| async move {
209            loop {
210                match rx.recv().await {
211                    Ok(msg) => {
212                        let message = to_message(&msg, codec.as_ref());
213                        return Some((Ok(message), (rx, codec)));
214                    }
215                    // The receiver fell behind the broadcast buffer; skip the gap and keep reading.
216                    Err(RecvError::Lagged(_)) => {}
217                    Err(RecvError::Closed) => return None,
218                }
219            }
220        })
221    }
222}
223
224/// A Pub/Sub delivery. `ack` / `nack` are unsupported (Pub/Sub has no acknowledgement).
225pub struct RedisPubSubMessage {
226    channel: String,
227    payload: Bytes,
228    headers: Headers,
229}
230
231impl Debug for RedisPubSubMessage {
232    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("RedisPubSubMessage")
234            .field("channel", &self.channel)
235            .field("payload_len", &self.payload.len())
236            .finish_non_exhaustive()
237    }
238}
239
240impl RedisPubSubMessage {
241    /// The channel this message arrived on.
242    #[must_use]
243    pub fn channel(&self) -> &str {
244        &self.channel
245    }
246}
247
248impl IncomingMessage for RedisPubSubMessage {
249    fn payload(&self) -> &[u8] {
250        &self.payload
251    }
252
253    fn headers(&self) -> &Headers {
254        &self.headers
255    }
256
257    async fn ack(self) -> Result<(), AckError> {
258        Err(AckError::Unsupported)
259    }
260
261    async fn nack(self, _requeue: bool) -> Result<(), AckError> {
262        Err(AckError::Unsupported)
263    }
264}
265
266impl Partitioned for RedisPubSubMessage {
267    fn partition_key(&self) -> Option<&[u8]> {
268        self.headers().get(PARTITION_KEY_HEADER)
269    }
270}
271
272/// Publishes Pub/Sub messages with `PUBLISH` (classic) or `SPUBLISH` (sharded).
273///
274/// Obtain it from [`RedisBroker::pubsub_publisher`](crate::RedisBroker::pubsub_publisher). The
275/// publish mode must match how subscribers subscribed: a sharded publish only reaches sharded
276/// subscribers. Headers are framed around the payload; set a [`codec`](Self::codec) for a readable
277/// wire format (it must match the subscriber's).
278#[derive(Clone)]
279pub struct RedisPubSubPublisher {
280    pool: Arc<OnceCell<fred::clients::Pool>>,
281    mode: PubSubMode,
282    codec: Option<SharedEnvelope>,
283}
284
285impl Debug for RedisPubSubPublisher {
286    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("RedisPubSubPublisher")
288            .field("mode", &self.mode)
289            .field("codec", &self.codec.is_some())
290            .finish_non_exhaustive()
291    }
292}
293
294impl RedisPubSubPublisher {
295    pub(crate) fn new(pool: Arc<OnceCell<fred::clients::Pool>>, mode: PubSubMode) -> Self {
296        Self {
297            pool,
298            mode,
299            codec: None,
300        }
301    }
302
303    /// Sets the publish mode. Defaults to whatever
304    /// [`RedisBroker::pubsub_publisher`](crate::RedisBroker::pubsub_publisher) selected.
305    #[must_use]
306    pub const fn mode(mut self, mode: PubSubMode) -> Self {
307        self.mode = mode;
308        self
309    }
310
311    /// Serializes the header/payload envelope with `codec` (must match the subscriber). Without it
312    /// the default lossless binary framing is used.
313    #[must_use]
314    pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
315        self.codec = Some(Arc::new(codec));
316        self
317    }
318}
319
320impl Publisher for RedisPubSubPublisher {
321    type Error = RedisError;
322
323    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
324        let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
325        let client = pool.next();
326        let channel = msg.name().to_owned();
327        let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
328        let _: i64 = match self.mode {
329            PubSubMode::Classic => client.publish(channel, body).await,
330            PubSubMode::Sharded => client.spublish(channel, body).await,
331        }
332        .map_err(RedisError::publish)?;
333        Ok(())
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn pattern_with_sharded_is_rejected() {
343        let err = RedisPubSub::new("e.*")
344            .mode(PubSubMode::Sharded)
345            .pattern()
346            .validate()
347            .unwrap_err();
348        assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("classic-only")));
349    }
350
351    #[test]
352    fn classic_pattern_validates() {
353        RedisPubSub::new("e.*").pattern().validate().expect("ok");
354    }
355}