Skip to main content

ruststream_fred/
broker.rs

1//! The [`RedisBroker`]: the entry point of the `fred` integration.
2
3use std::sync::Arc;
4
5use fred::clients::{Client, Pool};
6use fred::interfaces::{ClientLike, EventInterface, PubsubInterface, StreamsInterface};
7use fred::types::config::{Config, ServerConfig};
8use ruststream::{Broker, DescribeServer, ServerSpec, Subscribe};
9use tokio::sync::OnceCell;
10
11use crate::{
12    error::RedisError,
13    list::{RedisList, RedisListPublisher, RedisListSubscriber},
14    publisher::RedisPublisher,
15    pubsub::{PubSubMode, RedisPubSub, RedisPubSubPublisher, RedisPubSubSubscriber},
16    stream::RedisStream,
17    subscriber::RedisSubscriber,
18};
19
20/// Default `fred` connection-pool size when the caller does not set one.
21const DEFAULT_POOL_SIZE: usize = 4;
22
23/// How the broker should connect, recorded synchronously and resolved into a `fred` config at
24/// [`Broker::connect`] time so construction stays I/O- and failure-free.
25#[derive(Debug, Clone)]
26enum Topology {
27    /// A single server, addressed by URL (`redis://host:port`).
28    Standalone(String),
29    /// A Redis Cluster, addressed by one or more `host:port` seed nodes.
30    Cluster(Vec<String>),
31    /// Sentinel-managed replication: the monitored primary's `service` name plus the `host:port`
32    /// of each sentinel.
33    Sentinel { service: String, hosts: Vec<String> },
34    /// A pool supplied already-connected via [`RedisBroker::from_pool`]; no config to build.
35    Preconnected,
36}
37
38/// Parses a `host:port` address (tolerating a `redis://` / `rediss://` scheme prefix) into the
39/// `(host, port)` pair `fred`'s server-config constructors expect. Falls back to `default_port`
40/// when no port is given.
41fn parse_server(addr: &str, default_port: u16) -> Result<(String, u16), RedisError> {
42    let trimmed = addr
43        .trim()
44        .trim_start_matches("rediss://")
45        .trim_start_matches("redis://");
46    let (host, port) = match trimmed.rsplit_once(':') {
47        Some((host, port)) => {
48            let port = port.parse::<u16>().map_err(|_| {
49                RedisError::Connect(format!("invalid port in redis address `{addr}`").into())
50            })?;
51            (host, port)
52        }
53        None => (trimmed, default_port),
54    };
55    if host.is_empty() {
56        return Err(RedisError::Connect(
57            format!("missing host in redis address `{addr}`").into(),
58        ));
59    }
60    Ok((host.to_owned(), port))
61}
62
63fn parse_servers(addrs: &[String], default_port: u16) -> Result<Vec<(String, u16)>, RedisError> {
64    if addrs.is_empty() {
65        return Err(RedisError::Connect("no redis addresses provided".into()));
66    }
67    addrs
68        .iter()
69        .map(|addr| parse_server(addr, default_port))
70        .collect()
71}
72
73/// A Redis broker handle backed by a `fred` connection [`Pool`].
74///
75/// Construct it synchronously with [`RedisBroker::standalone`] and let the runtime connect it at
76/// startup, or eagerly with [`RedisBroker::connect`] / [`RedisBroker::from_pool`]. The handle is
77/// cheap to clone, and clones share one pool. Subscriptions are opened through
78/// [`RedisBroker::subscribe`] with a [`RedisStream`] descriptor.
79///
80/// # Lazy connection
81///
82/// [`standalone`](Self::standalone) performs no I/O: it only records the server address. The pool
83/// is opened by [`Broker::connect`], which the runtime calls once at startup, so a Redis service
84/// can be built with the synchronous `#[ruststream::app]` macro. Publishers handed out before
85/// `connect` resolve the shared pool on first use; operations that need it before `connect` return
86/// [`RedisError::NotConnected`].
87///
88/// # Examples
89///
90/// ```no_run
91/// use ruststream_fred::{RedisBroker, RedisStream};
92///
93/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
94/// let broker = RedisBroker::connect("redis://localhost:6379").await?;
95/// let publisher = broker.publisher();
96/// let sub = broker.subscribe(RedisStream::new("orders").group("workers")).await?;
97/// # let _ = (publisher, sub);
98/// broker.shutdown_pool().await;
99/// # Ok(())
100/// # }
101/// ```
102#[derive(Clone)]
103pub struct RedisBroker {
104    pool: Arc<OnceCell<Pool>>,
105    topology: Topology,
106    pool_size: usize,
107    default_group: Option<String>,
108}
109
110impl std::fmt::Debug for RedisBroker {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("RedisBroker")
113            .field("topology", &self.topology)
114            .field("pool_size", &self.pool_size)
115            .field("default_group", &self.default_group)
116            .finish_non_exhaustive()
117    }
118}
119
120impl RedisBroker {
121    /// Creates a standalone-topology broker that connects to `url` when [`Broker::connect`] runs.
122    ///
123    /// Synchronous and performs no I/O, so it slots into the `#[ruststream::app]` builder; the
124    /// connection is opened lazily at startup. See the [type docs](Self#lazy-connection).
125    #[must_use]
126    pub fn standalone(url: impl Into<String>) -> Self {
127        Self::with_topology(Topology::Standalone(url.into()))
128    }
129
130    /// Creates a Redis Cluster broker from one or more `host:port` seed nodes.
131    ///
132    /// Only one reachable node is needed; `fred` discovers the rest of the cluster on connect.
133    /// Synchronous and performs no I/O. See the [type docs](Self#lazy-connection).
134    #[must_use]
135    pub fn cluster(nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
136        Self::with_topology(Topology::Cluster(
137            nodes.into_iter().map(Into::into).collect(),
138        ))
139    }
140
141    /// Creates a Sentinel-backed broker that tracks the primary named `service`, discovering it
142    /// through the given sentinel `host:port` addresses.
143    ///
144    /// Synchronous and performs no I/O. See the [type docs](Self#lazy-connection).
145    #[must_use]
146    pub fn sentinel(
147        service: impl Into<String>,
148        sentinels: impl IntoIterator<Item = impl Into<String>>,
149    ) -> Self {
150        Self::with_topology(Topology::Sentinel {
151            service: service.into(),
152            hosts: sentinels.into_iter().map(Into::into).collect(),
153        })
154    }
155
156    fn with_topology(topology: Topology) -> Self {
157        Self {
158            pool: Arc::new(OnceCell::new()),
159            topology,
160            pool_size: DEFAULT_POOL_SIZE,
161            default_group: None,
162        }
163    }
164
165    /// Sets the connection-pool size. Defaults to 4.
166    #[must_use]
167    pub const fn pool(mut self, size: usize) -> Self {
168        self.pool_size = size;
169        self
170    }
171
172    /// Sets a broker-wide default consumer group, enabling the bare-string `#[subscriber("key")]`
173    /// form (Redis Streams always read through a group). Without it a bare-string subscription
174    /// returns [`RedisError::InvalidOptions`]; name the group per subscription with
175    /// [`RedisStream::group`] instead.
176    #[must_use]
177    pub fn default_group(mut self, group: impl Into<String>) -> Self {
178        self.default_group = Some(group.into());
179        self
180    }
181
182    /// Connects to a standalone Redis server eagerly, returning an already-connected broker.
183    ///
184    /// # Errors
185    ///
186    /// Returns [`RedisError::Connect`] when the URL is invalid or the connection cannot be
187    /// established.
188    pub async fn connect(url: impl Into<String>) -> Result<Self, RedisError> {
189        let broker = Self::standalone(url);
190        Broker::connect(&broker).await?;
191        Ok(broker)
192    }
193
194    /// Wraps an already-connected `fred` pool. Useful for advanced configuration (TLS, cluster,
195    /// sentinel, custom performance and reconnection policies).
196    #[must_use]
197    pub fn from_pool(pool: Pool) -> Self {
198        Self {
199            pool: Arc::new(OnceCell::new_with(Some(pool))),
200            topology: Topology::Preconnected,
201            pool_size: DEFAULT_POOL_SIZE,
202            default_group: None,
203        }
204    }
205
206    /// Builds the `fred` config for this broker's topology.
207    fn build_config(&self) -> Result<Config, RedisError> {
208        match &self.topology {
209            Topology::Standalone(url) => {
210                Config::from_url(url).map_err(|err| RedisError::Connect(Box::new(err)))
211            }
212            Topology::Cluster(nodes) => {
213                let hosts = parse_servers(nodes, 6379)?;
214                Ok(Config {
215                    server: ServerConfig::new_clustered(hosts),
216                    ..Config::default()
217                })
218            }
219            Topology::Sentinel { service, hosts } => {
220                let hosts = parse_servers(hosts, 26379)?;
221                Ok(Config {
222                    server: ServerConfig::new_sentinel(hosts, service.clone()),
223                    ..Config::default()
224                })
225            }
226            // A preconnected pool never reaches connect()'s init path.
227            Topology::Preconnected => Err(RedisError::NotConnected),
228        }
229    }
230
231    /// The connected pool, or [`RedisError::NotConnected`] when `connect` has not run yet.
232    fn connected(&self) -> Result<Pool, RedisError> {
233        self.pool.get().cloned().ok_or(RedisError::NotConnected)
234    }
235
236    /// Returns a clone of the underlying pool. Useful for advanced operations not covered by the
237    /// wrapper.
238    ///
239    /// # Panics
240    ///
241    /// Panics if the broker has not connected yet (built with [`standalone`](Self::standalone) and
242    /// [`Broker::connect`] not run). Call it after startup, or build with [`connect`](Self::connect)
243    /// / [`from_pool`](Self::from_pool).
244    #[must_use]
245    pub fn pool_handle(&self) -> Pool {
246        self.pool
247            .get()
248            .cloned()
249            .expect("RedisBroker::pool_handle() called before connect()")
250    }
251
252    /// Opens a stream subscription described by `def`.
253    ///
254    /// Ensures the consumer group exists (`XGROUP CREATE ... MKSTREAM`, ignoring an
255    /// already-existing group) before returning the subscriber.
256    ///
257    /// # Errors
258    ///
259    /// Returns [`RedisError::NotConnected`] when the broker has not connected,
260    /// [`RedisError::InvalidOptions`] when `def` names no consumer group, or
261    /// [`RedisError::Subscribe`] when the group cannot be created.
262    pub async fn subscribe(&self, def: RedisStream) -> Result<RedisSubscriber, RedisError> {
263        let pool = self.connected()?;
264        let group = def.group_or_err()?.to_owned();
265        let consumer = def.consumer_or_auto();
266        ensure_group(&pool, def.key(), &group, def.start().as_id()).await?;
267        Ok(RedisSubscriber::new(
268            pool,
269            def.key().to_owned(),
270            group,
271            consumer,
272            def.count_or_default(),
273            def.block_or_default(),
274            def.mode(),
275        ))
276    }
277
278    /// Returns a publisher bound to this broker.
279    ///
280    /// It may be created before [`Broker::connect`] (for example inside the `with_broker` builder);
281    /// it resolves the shared pool when it first publishes.
282    #[must_use]
283    pub fn publisher(&self) -> RedisPublisher {
284        RedisPublisher::new(Arc::clone(&self.pool), self.supports_transactions())
285    }
286
287    /// Whether this topology can offer multi-key transactions. Cluster cannot (buffered keys may
288    /// hash to different nodes), so its publishers reject `begin_transaction`.
289    const fn supports_transactions(&self) -> bool {
290        !matches!(self.topology, Topology::Cluster(_))
291    }
292
293    /// Builds and connects a dedicated `fred` client (used for Pub/Sub, which needs an isolated
294    /// message stream and channel state per subscriber).
295    async fn new_client(&self) -> Result<Client, RedisError> {
296        let config = self.build_config()?;
297        let client = Client::new(config, None, None, None);
298        client
299            .init()
300            .await
301            .map_err(|err| RedisError::Connect(Box::new(err)))?;
302        Ok(client)
303    }
304
305    /// Opens a Pub/Sub subscription described by `def` on a dedicated client.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`RedisError::InvalidOptions`] for an invalid mode/pattern combination,
310    /// [`RedisError::Connect`] when the dedicated client cannot connect, or
311    /// [`RedisError::Subscribe`] when the subscribe command fails.
312    pub async fn subscribe_pubsub(
313        &self,
314        def: RedisPubSub,
315    ) -> Result<RedisPubSubSubscriber, RedisError> {
316        def.validate()?;
317        let codec = def.codec_handle();
318        let client = self.new_client().await?;
319        let channel = def.channel().to_owned();
320        let result = match (def.delivery_mode(), def.is_pattern()) {
321            (PubSubMode::Classic, true) => client.psubscribe(channel).await,
322            (PubSubMode::Classic, false) => client.subscribe(channel).await,
323            (PubSubMode::Sharded, _) => client.ssubscribe(channel).await,
324        };
325        result.map_err(RedisError::subscribe)?;
326        let rx = client.message_rx();
327        Ok(RedisPubSubSubscriber::new(client, rx, codec))
328    }
329
330    /// Opens a list (work-queue) subscription described by `def`.
331    ///
332    /// # Errors
333    ///
334    /// Returns [`RedisError::NotConnected`] when the broker has not connected.
335    #[allow(
336        clippy::unused_async,
337        reason = "async for parity with the other subscribe methods and the SubscriptionSource shape"
338    )]
339    pub async fn subscribe_list(&self, def: RedisList) -> Result<RedisListSubscriber, RedisError> {
340        let pool = self.connected()?;
341        Ok(RedisListSubscriber::new(
342            pool,
343            def.key().to_owned(),
344            def.is_reliable(),
345            def.processing_or_default(),
346            def.block_or_default(),
347            def.codec_handle(),
348        ))
349    }
350
351    /// Returns a Pub/Sub publisher (classic mode by default; override with
352    /// [`RedisPubSubPublisher::mode`]).
353    #[must_use]
354    pub fn pubsub_publisher(&self) -> RedisPubSubPublisher {
355        RedisPubSubPublisher::new(Arc::clone(&self.pool), PubSubMode::Classic)
356    }
357
358    /// Returns a list publisher (`LPUSH`).
359    #[must_use]
360    pub fn list_publisher(&self) -> RedisListPublisher {
361        RedisListPublisher::new(Arc::clone(&self.pool))
362    }
363
364    /// Closes the underlying pool. A no-op if the broker never connected.
365    pub async fn shutdown_pool(&self) {
366        if let Some(pool) = self.pool.get() {
367            let _ = pool.quit().await;
368        }
369    }
370}
371
372/// Creates the consumer group, treating an already-existing group as success.
373async fn ensure_group(
374    pool: &Pool,
375    key: &str,
376    group: &str,
377    start_id: &str,
378) -> Result<(), RedisError> {
379    let result: Result<String, fred::error::Error> =
380        pool.xgroup_create(key, group, start_id, true).await;
381    match result {
382        Ok(_) => Ok(()),
383        // BUSYGROUP: the group already exists, which is the steady-state case.
384        Err(err) if err.details().contains("BUSYGROUP") => Ok(()),
385        Err(err) => Err(RedisError::subscribe(err)),
386    }
387}
388
389impl Broker for RedisBroker {
390    type Error = RedisError;
391
392    async fn connect(&self) -> Result<(), Self::Error> {
393        self.pool
394            .get_or_try_init(|| async {
395                let config = self.build_config()?;
396                let pool = Pool::new(config, None, None, None, self.pool_size)
397                    .map_err(|err| RedisError::Connect(Box::new(err)))?;
398                pool.init()
399                    .await
400                    .map_err(|err| RedisError::Connect(Box::new(err)))?;
401                Ok(pool)
402            })
403            .await?;
404        Ok(())
405    }
406
407    async fn shutdown(&self) -> Result<(), Self::Error> {
408        self.shutdown_pool().await;
409        Ok(())
410    }
411}
412
413// By-name subscription capability for the bare string `#[subscriber("key")]` form. Redis Streams
414// always read through a consumer group, so this requires a broker-wide default group.
415#[allow(clippy::use_self)]
416impl Subscribe for RedisBroker {
417    type Subscriber = RedisSubscriber;
418
419    async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
420        let group = self.default_group.clone().ok_or_else(|| {
421            RedisError::InvalidOptions(format!(
422                "bare-string subscription on `{name}` needs a broker-wide default group: \
423                 call RedisBroker::default_group(name), or subscribe with \
424                 RedisStream::new(name).group(group)"
425            ))
426        })?;
427        RedisBroker::subscribe(self, RedisStream::new(name).group(group)).await
428    }
429}
430
431/// `DescribeServer` reports the configured Redis address (the first seed for cluster/sentinel).
432impl DescribeServer for RedisBroker {
433    fn describe_server(&self) -> ServerSpec {
434        let host = match &self.topology {
435            Topology::Standalone(url) => url
436                .trim_start_matches("rediss://")
437                .trim_start_matches("redis://")
438                .to_owned(),
439            Topology::Cluster(nodes) => nodes.first().cloned().unwrap_or_default(),
440            Topology::Sentinel { hosts, .. } => hosts.first().cloned().unwrap_or_default(),
441            Topology::Preconnected => String::new(),
442        };
443        ServerSpec::new(host, "redis")
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use ruststream::{OutgoingMessage, Publisher};
450
451    use super::*;
452
453    // `standalone` records the address without connecting, so operations needing the connection
454    // fail cleanly until `Broker::connect` runs. No server required.
455    #[tokio::test]
456    async fn standalone_does_not_connect() {
457        let broker = RedisBroker::standalone("redis://127.0.0.1:6379");
458
459        let publish_err = broker
460            .publisher()
461            .publish(OutgoingMessage::new("orders", b"{}".as_slice()))
462            .await
463            .unwrap_err();
464        assert!(matches!(publish_err, RedisError::NotConnected));
465
466        let subscribe_err = broker
467            .subscribe(RedisStream::new("orders").group("g"))
468            .await
469            .unwrap_err();
470        assert!(matches!(subscribe_err, RedisError::NotConnected));
471    }
472
473    #[tokio::test]
474    async fn bare_string_subscription_needs_default_group() {
475        let broker = RedisBroker::standalone("redis://127.0.0.1:6379");
476        let err = Subscribe::subscribe(&broker, "orders").await.unwrap_err();
477        assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("default group")));
478    }
479
480    #[test]
481    fn describe_server_reports_redis() {
482        let broker = RedisBroker::standalone("redis://localhost:6379");
483        let spec = broker.describe_server();
484        assert_eq!(spec.protocol, "redis");
485        assert_eq!(spec.host, "localhost:6379");
486    }
487}