Skip to main content

ruststream_fred/
broker.rs

1//! The [`RedisBroker`]: the entry point of the `fred` integration.
2
3use std::sync::Arc;
4
5use fred::clients::{Client, Pool};
6use fred::interfaces::{ClientLike, EventInterface, PubsubInterface, StreamsInterface};
7#[cfg(feature = "credential-provider")]
8use fred::types::config::CredentialProvider;
9#[cfg(any(
10    feature = "tls-rustls",
11    feature = "tls-rustls-ring",
12    feature = "tls-native-tls"
13))]
14use fred::types::config::TlsConfig;
15use fred::types::config::{Config, ServerConfig};
16use ruststream::{Broker, DescribeServer, ServerSpec, Subscribe};
17use tokio::sync::OnceCell;
18
19use crate::{
20    error::RedisError,
21    list::{RedisList, RedisListPublisher, RedisListSubscriber},
22    publisher::RedisPublisher,
23    pubsub::{PubSubMode, RedisPubSub, RedisPubSubPublisher, RedisPubSubSubscriber},
24    stream::RedisStream,
25    subscriber::RedisSubscriber,
26};
27
28/// Default `fred` connection-pool size when the caller does not set one.
29const DEFAULT_POOL_SIZE: usize = 4;
30
31/// How the broker should connect, recorded synchronously and resolved into a `fred` config at
32/// [`Broker::connect`] time so construction stays I/O- and failure-free.
33#[derive(Debug, Clone)]
34enum Topology {
35    /// A single server, addressed by URL (`redis://host:port`).
36    Standalone(String),
37    /// A Redis Cluster, addressed by one or more `host:port` seed nodes.
38    Cluster(Vec<String>),
39    /// Sentinel-managed replication: the monitored primary's `service` name plus the `host:port`
40    /// of each sentinel.
41    Sentinel { service: String, hosts: Vec<String> },
42    /// A pool supplied already-connected via [`RedisBroker::from_pool`]; no config to build.
43    Preconnected,
44}
45
46/// Parses a `host:port` address (tolerating a `redis://` / `rediss://` scheme prefix) into the
47/// `(host, port)` pair `fred`'s server-config constructors expect. Falls back to `default_port`
48/// when no port is given.
49fn parse_server(addr: &str, default_port: u16) -> Result<(String, u16), RedisError> {
50    let trimmed = addr
51        .trim()
52        .trim_start_matches("rediss://")
53        .trim_start_matches("redis://");
54    let (host, port) = match trimmed.rsplit_once(':') {
55        Some((host, port)) => {
56            let port = port.parse::<u16>().map_err(|_| {
57                RedisError::Connect(format!("invalid port in redis address `{addr}`").into())
58            })?;
59            (host, port)
60        }
61        None => (trimmed, default_port),
62    };
63    if host.is_empty() {
64        return Err(RedisError::Connect(
65            format!("missing host in redis address `{addr}`").into(),
66        ));
67    }
68    Ok((host.to_owned(), port))
69}
70
71fn parse_servers(addrs: &[String], default_port: u16) -> Result<Vec<(String, u16)>, RedisError> {
72    if addrs.is_empty() {
73        return Err(RedisError::Connect("no redis addresses provided".into()));
74    }
75    addrs
76        .iter()
77        .map(|addr| parse_server(addr, default_port))
78        .collect()
79}
80
81/// Authentication and TLS settings recorded on the broker and folded into the `fred` [`Config`]
82/// at connect time, on every topology. Fields with no value are left untouched, so credentials
83/// supplied through a standalone `redis://user:pass@host` URL survive unless overridden here.
84#[derive(Clone, Default)]
85struct AuthConfig {
86    /// ACL username for the data nodes (`Config.username`).
87    username: Option<String>,
88    /// Password for the data nodes (`Config.password`).
89    password: Option<String>,
90    /// ACL username for authenticating to the sentinels, distinct from the data-node username.
91    #[cfg(feature = "sentinel-auth")]
92    sentinel_username: Option<String>,
93    /// Password for authenticating to the sentinels, distinct from the data-node password.
94    #[cfg(feature = "sentinel-auth")]
95    sentinel_password: Option<String>,
96    /// Explicit TLS configuration (`Config.tls`).
97    #[cfg(any(
98        feature = "tls-rustls",
99        feature = "tls-rustls-ring",
100        feature = "tls-native-tls"
101    ))]
102    tls: Option<TlsConfig>,
103    /// Dynamic/rotating credential provider (`Config.credential_provider`).
104    #[cfg(feature = "credential-provider")]
105    credential_provider: Option<Arc<dyn CredentialProvider>>,
106}
107
108// Redacts secrets: passwords never appear, and TLS / credential-provider show only presence. The
109// usernames are identifiers (not secrets) and are kept to aid debugging.
110impl std::fmt::Debug for AuthConfig {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        let mut s = f.debug_struct("AuthConfig");
113        s.field("username", &self.username);
114        s.field("password", &self.password.as_ref().map(|_| "<redacted>"));
115        #[cfg(feature = "sentinel-auth")]
116        {
117            s.field("sentinel_username", &self.sentinel_username);
118            s.field(
119                "sentinel_password",
120                &self.sentinel_password.as_ref().map(|_| "<redacted>"),
121            );
122        }
123        #[cfg(any(
124            feature = "tls-rustls",
125            feature = "tls-rustls-ring",
126            feature = "tls-native-tls"
127        ))]
128        s.field("tls", &self.tls.as_ref().map(|_| "<configured>"));
129        #[cfg(feature = "credential-provider")]
130        s.field(
131            "credential_provider",
132            &self.credential_provider.as_ref().map(|_| "<configured>"),
133        );
134        s.finish()
135    }
136}
137
138/// A Redis broker handle backed by a `fred` connection [`Pool`].
139///
140/// Construct it synchronously with [`RedisBroker::standalone`] and let the runtime connect it at
141/// startup, or eagerly with [`RedisBroker::connect`] / [`RedisBroker::from_pool`]. The handle is
142/// cheap to clone, and clones share one pool. Subscriptions are opened through
143/// [`RedisBroker::subscribe`] with a [`RedisStream`] descriptor.
144///
145/// # Lazy connection
146///
147/// [`standalone`](Self::standalone) performs no I/O: it only records the server address. The pool
148/// is opened by [`Broker::connect`], which the runtime calls once at startup, so a Redis service
149/// can be built with the synchronous `#[ruststream::app]` macro. Publishers handed out before
150/// `connect` resolve the shared pool on first use; operations that need it before `connect` return
151/// [`RedisError::NotConnected`].
152///
153/// # Examples
154///
155/// ```no_run
156/// use ruststream_fred::{RedisBroker, RedisStream};
157///
158/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
159/// let broker = RedisBroker::connect("redis://localhost:6379").await?;
160/// let publisher = broker.publisher();
161/// let sub = broker.subscribe(RedisStream::new("orders").group("workers")).await?;
162/// # let _ = (publisher, sub);
163/// broker.shutdown_pool().await;
164/// # Ok(())
165/// # }
166/// ```
167#[derive(Clone)]
168pub struct RedisBroker {
169    pool: Arc<OnceCell<Pool>>,
170    topology: Topology,
171    pool_size: usize,
172    default_group: Option<String>,
173    auth: AuthConfig,
174}
175
176impl std::fmt::Debug for RedisBroker {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        f.debug_struct("RedisBroker")
179            .field("topology", &self.topology)
180            .field("pool_size", &self.pool_size)
181            .field("default_group", &self.default_group)
182            .field("auth", &self.auth)
183            .finish_non_exhaustive()
184    }
185}
186
187impl RedisBroker {
188    /// Creates a standalone-topology broker that connects to `url` when [`Broker::connect`] runs.
189    ///
190    /// Synchronous and performs no I/O, so it slots into the `#[ruststream::app]` builder; the
191    /// connection is opened lazily at startup. See the [type docs](Self#lazy-connection).
192    #[must_use]
193    pub fn standalone(url: impl Into<String>) -> Self {
194        Self::with_topology(Topology::Standalone(url.into()))
195    }
196
197    /// Creates a Redis Cluster broker from one or more `host:port` seed nodes.
198    ///
199    /// Only one reachable node is needed; `fred` discovers the rest of the cluster on connect.
200    /// Synchronous and performs no I/O. See the [type docs](Self#lazy-connection).
201    #[must_use]
202    pub fn cluster(nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
203        Self::with_topology(Topology::Cluster(
204            nodes.into_iter().map(Into::into).collect(),
205        ))
206    }
207
208    /// Creates a Sentinel-backed broker that tracks the primary named `service`, discovering it
209    /// through the given sentinel `host:port` addresses.
210    ///
211    /// Synchronous and performs no I/O. See the [type docs](Self#lazy-connection).
212    #[must_use]
213    pub fn sentinel(
214        service: impl Into<String>,
215        sentinels: impl IntoIterator<Item = impl Into<String>>,
216    ) -> Self {
217        Self::with_topology(Topology::Sentinel {
218            service: service.into(),
219            hosts: sentinels.into_iter().map(Into::into).collect(),
220        })
221    }
222
223    fn with_topology(topology: Topology) -> Self {
224        Self {
225            pool: Arc::new(OnceCell::new()),
226            topology,
227            pool_size: DEFAULT_POOL_SIZE,
228            default_group: None,
229            auth: AuthConfig::default(),
230        }
231    }
232
233    /// Sets the connection-pool size. Defaults to 4.
234    #[must_use]
235    pub const fn pool(mut self, size: usize) -> Self {
236        self.pool_size = size;
237        self
238    }
239
240    /// Sets a broker-wide default consumer group, enabling the bare-string `#[subscriber("key")]`
241    /// form (Redis Streams always read through a group). Without it a bare-string subscription
242    /// returns [`RedisError::InvalidOptions`]; name the group per subscription with
243    /// [`RedisStream::group`] instead.
244    #[must_use]
245    pub fn default_group(mut self, group: impl Into<String>) -> Self {
246        self.default_group = Some(group.into());
247        self
248    }
249
250    /// Sets the ACL `username` and `password` used to authenticate on connect, applied on every
251    /// topology (standalone, cluster, sentinel).
252    ///
253    /// This maps onto `fred`'s `Config.username` / `Config.password`, so authentication works
254    /// beyond the standalone `redis://user:pass@host` URL, which the bare `cluster` / `sentinel`
255    /// seed lists cannot express. Credentials set here override any in a standalone URL.
256    ///
257    /// For a password-only `AUTH` (the legacy `requirepass`, no ACL user) use
258    /// [`password`](Self::password).
259    ///
260    /// # Examples
261    ///
262    /// ```no_run
263    /// use ruststream_fred::RedisBroker;
264    ///
265    /// let broker = RedisBroker::cluster(["10.0.0.1:6379"]).credentials("worker", "s3cr3t");
266    /// # let _ = broker;
267    /// ```
268    #[must_use]
269    pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
270        self.auth.username = Some(username.into());
271        self.auth.password = Some(password.into());
272        self
273    }
274
275    /// Sets a password-only `AUTH` (no ACL username; the legacy `requirepass` form), on every
276    /// topology. Use [`credentials`](Self::credentials) for an ACL user plus password.
277    ///
278    /// # Examples
279    ///
280    /// ```no_run
281    /// use ruststream_fred::RedisBroker;
282    ///
283    /// let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"]).password("s3cr3t");
284    /// # let _ = broker;
285    /// ```
286    #[must_use]
287    pub fn password(mut self, password: impl Into<String>) -> Self {
288        self.auth.password = Some(password.into());
289        self
290    }
291
292    /// Sets the TLS configuration used on connect, on every topology. Accepts a `fred`
293    /// [`TlsConfig`] or anything convertible into one (for example a `TlsConnector`).
294    ///
295    /// Available behind the `tls-rustls`, `tls-rustls-ring`, or `tls-native-tls` feature; a
296    /// standalone broker can also enable TLS through a `rediss://` / `valkeys://` URL. The
297    /// `fred` re-exports [`TlsConfig`](crate::TlsConfig) / [`TlsConnector`](crate::TlsConnector)
298    /// provide `default_rustls()` / `default_native_tls()` shorthands for system-trust setups.
299    ///
300    /// # Examples
301    ///
302    /// ```no_run
303    /// use ruststream_fred::{RedisBroker, TlsConfig};
304    ///
305    /// fn build(tls: TlsConfig) -> RedisBroker {
306    ///     RedisBroker::cluster(["10.0.0.1:6379"]).tls(tls)
307    /// }
308    /// ```
309    #[cfg(any(
310        feature = "tls-rustls",
311        feature = "tls-rustls-ring",
312        feature = "tls-native-tls"
313    ))]
314    #[must_use]
315    pub fn tls(mut self, tls: impl Into<TlsConfig>) -> Self {
316        self.auth.tls = Some(tls.into());
317        self
318    }
319
320    /// Sets distinct credentials for authenticating to the sentinel nodes, separate from the
321    /// data-node [`credentials`](Self::credentials). Only meaningful on the sentinel topology.
322    ///
323    /// Available behind the `sentinel-auth` feature.
324    ///
325    /// # Examples
326    ///
327    /// ```no_run
328    /// use ruststream_fred::RedisBroker;
329    ///
330    /// let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"])
331    ///     .credentials("worker", "data-pass")
332    ///     .sentinel_credentials("sentinel-user", "sentinel-pass");
333    /// # let _ = broker;
334    /// ```
335    #[cfg(feature = "sentinel-auth")]
336    #[must_use]
337    pub fn sentinel_credentials(
338        mut self,
339        username: impl Into<String>,
340        password: impl Into<String>,
341    ) -> Self {
342        self.auth.sentinel_username = Some(username.into());
343        self.auth.sentinel_password = Some(password.into());
344        self
345    }
346
347    /// Sets a password-only credential for authenticating to the sentinel nodes. Use
348    /// [`sentinel_credentials`](Self::sentinel_credentials) for an ACL user plus password.
349    ///
350    /// Available behind the `sentinel-auth` feature.
351    ///
352    /// # Examples
353    ///
354    /// ```no_run
355    /// use ruststream_fred::RedisBroker;
356    ///
357    /// let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"])
358    ///     .sentinel_password("sentinel-pass");
359    /// # let _ = broker;
360    /// ```
361    #[cfg(feature = "sentinel-auth")]
362    #[must_use]
363    pub fn sentinel_password(mut self, password: impl Into<String>) -> Self {
364        self.auth.sentinel_password = Some(password.into());
365        self
366    }
367
368    /// Sets a dynamic credential provider that supplies (and can rotate) the username/password on
369    /// each `AUTH` / `HELLO`, for IAM-style auth. Takes precedence over static
370    /// [`credentials`](Self::credentials).
371    ///
372    /// Available behind the `credential-provider` feature.
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// use std::sync::Arc;
378    /// use ruststream_fred::{CredentialProvider, RedisBroker};
379    ///
380    /// fn build(provider: Arc<dyn CredentialProvider>) -> RedisBroker {
381    ///     RedisBroker::standalone("redis://localhost:6379").credential_provider(provider)
382    /// }
383    /// ```
384    #[cfg(feature = "credential-provider")]
385    #[must_use]
386    pub fn credential_provider(mut self, provider: Arc<dyn CredentialProvider>) -> Self {
387        self.auth.credential_provider = Some(provider);
388        self
389    }
390
391    /// Connects to a standalone Redis server eagerly, returning an already-connected broker.
392    ///
393    /// # Errors
394    ///
395    /// Returns [`RedisError::Connect`] when the URL is invalid or the connection cannot be
396    /// established.
397    pub async fn connect(url: impl Into<String>) -> Result<Self, RedisError> {
398        let broker = Self::standalone(url);
399        Broker::connect(&broker).await?;
400        Ok(broker)
401    }
402
403    /// Wraps an already-connected `fred` pool. Useful for advanced configuration (TLS, cluster,
404    /// sentinel, custom performance and reconnection policies).
405    #[must_use]
406    pub fn from_pool(pool: Pool) -> Self {
407        Self {
408            pool: Arc::new(OnceCell::new_with(Some(pool))),
409            topology: Topology::Preconnected,
410            pool_size: DEFAULT_POOL_SIZE,
411            default_group: None,
412            auth: AuthConfig::default(),
413        }
414    }
415
416    /// Builds the `fred` config for this broker's topology, then folds in the auth/TLS settings.
417    fn build_config(&self) -> Result<Config, RedisError> {
418        let mut config = match &self.topology {
419            Topology::Standalone(url) => {
420                Config::from_url(url).map_err(|err| RedisError::Connect(Box::new(err)))?
421            }
422            Topology::Cluster(nodes) => {
423                let hosts = parse_servers(nodes, 6379)?;
424                Config {
425                    server: ServerConfig::new_clustered(hosts),
426                    ..Config::default()
427                }
428            }
429            Topology::Sentinel { service, hosts } => {
430                let hosts = parse_servers(hosts, 26379)?;
431                Config {
432                    server: ServerConfig::new_sentinel(hosts, service.clone()),
433                    ..Config::default()
434                }
435            }
436            // A preconnected pool never reaches connect()'s init path.
437            Topology::Preconnected => return Err(RedisError::NotConnected),
438        };
439        self.apply_auth(&mut config);
440        Ok(config)
441    }
442
443    /// Folds the recorded auth/TLS settings into `config`. Each setting is applied only when set,
444    /// so credentials carried by a standalone URL survive unless explicitly overridden.
445    fn apply_auth(&self, config: &mut Config) {
446        if self.auth.username.is_some() {
447            config.username.clone_from(&self.auth.username);
448        }
449        if self.auth.password.is_some() {
450            config.password.clone_from(&self.auth.password);
451        }
452        #[cfg(any(
453            feature = "tls-rustls",
454            feature = "tls-rustls-ring",
455            feature = "tls-native-tls"
456        ))]
457        if self.auth.tls.is_some() {
458            config.tls.clone_from(&self.auth.tls);
459        }
460        #[cfg(feature = "credential-provider")]
461        if self.auth.credential_provider.is_some() {
462            config
463                .credential_provider
464                .clone_from(&self.auth.credential_provider);
465        }
466        #[cfg(feature = "sentinel-auth")]
467        if let ServerConfig::Sentinel {
468            username, password, ..
469        } = &mut config.server
470        {
471            if self.auth.sentinel_username.is_some() {
472                username.clone_from(&self.auth.sentinel_username);
473            }
474            if self.auth.sentinel_password.is_some() {
475                password.clone_from(&self.auth.sentinel_password);
476            }
477        }
478    }
479
480    /// The connected pool, or [`RedisError::NotConnected`] when `connect` has not run yet.
481    fn connected(&self) -> Result<Pool, RedisError> {
482        self.pool.get().cloned().ok_or(RedisError::NotConnected)
483    }
484
485    /// Returns a clone of the underlying pool. Useful for advanced operations not covered by the
486    /// wrapper.
487    ///
488    /// # Panics
489    ///
490    /// Panics if the broker has not connected yet (built with [`standalone`](Self::standalone) and
491    /// [`Broker::connect`] not run). Call it after startup, or build with [`connect`](Self::connect)
492    /// / [`from_pool`](Self::from_pool).
493    #[must_use]
494    pub fn pool_handle(&self) -> Pool {
495        self.pool
496            .get()
497            .cloned()
498            .expect("RedisBroker::pool_handle() called before connect()")
499    }
500
501    /// Opens a stream subscription described by `def`.
502    ///
503    /// Ensures the consumer group exists (`XGROUP CREATE ... MKSTREAM`, ignoring an
504    /// already-existing group) before returning the subscriber.
505    ///
506    /// # Errors
507    ///
508    /// Returns [`RedisError::NotConnected`] when the broker has not connected,
509    /// [`RedisError::InvalidOptions`] when `def` names no consumer group, or
510    /// [`RedisError::Subscribe`] when the group cannot be created.
511    pub async fn subscribe(&self, def: RedisStream) -> Result<RedisSubscriber, RedisError> {
512        let pool = self.connected()?;
513        let group = def.group_or_err()?.to_owned();
514        let consumer = def.consumer_or_auto();
515        ensure_group(&pool, def.key(), &group, def.start().as_id()).await?;
516        Ok(RedisSubscriber::new(
517            pool,
518            def.key().to_owned(),
519            group,
520            consumer,
521            def.count_or_default(),
522            def.block_or_default(),
523            def.mode(),
524            def.poison_policy(),
525            def.delay_config(),
526        ))
527    }
528
529    /// Returns a publisher bound to this broker.
530    ///
531    /// It may be created before [`Broker::connect`] (for example inside the `with_broker` builder);
532    /// it resolves the shared pool when it first publishes.
533    #[must_use]
534    pub fn publisher(&self) -> RedisPublisher {
535        RedisPublisher::new(Arc::clone(&self.pool), self.supports_transactions())
536    }
537
538    /// Whether this topology can offer multi-key transactions. Cluster cannot (buffered keys may
539    /// hash to different nodes), so its publishers reject `begin_transaction`.
540    const fn supports_transactions(&self) -> bool {
541        !matches!(self.topology, Topology::Cluster(_))
542    }
543
544    /// Builds and connects a dedicated `fred` client (used for Pub/Sub, which needs an isolated
545    /// message stream and channel state per subscriber).
546    async fn new_client(&self) -> Result<Client, RedisError> {
547        let config = self.build_config()?;
548        let client = Client::new(config, None, None, None);
549        client
550            .init()
551            .await
552            .map_err(|err| RedisError::Connect(Box::new(err)))?;
553        Ok(client)
554    }
555
556    /// Opens a Pub/Sub subscription described by `def` on a dedicated client.
557    ///
558    /// # Errors
559    ///
560    /// Returns [`RedisError::InvalidOptions`] for an invalid mode/pattern combination,
561    /// [`RedisError::Connect`] when the dedicated client cannot connect, or
562    /// [`RedisError::Subscribe`] when the subscribe command fails.
563    pub async fn subscribe_pubsub(
564        &self,
565        def: RedisPubSub,
566    ) -> Result<RedisPubSubSubscriber, RedisError> {
567        def.validate()?;
568        let codec = def.codec_handle();
569        let client = self.new_client().await?;
570        let channel = def.channel().to_owned();
571        let result = match (def.delivery_mode(), def.is_pattern()) {
572            (PubSubMode::Classic, true) => client.psubscribe(channel).await,
573            (PubSubMode::Classic, false) => client.subscribe(channel).await,
574            (PubSubMode::Sharded, _) => client.ssubscribe(channel).await,
575        };
576        result.map_err(RedisError::subscribe)?;
577        let rx = client.message_rx();
578        Ok(RedisPubSubSubscriber::new(client, rx, codec))
579    }
580
581    /// Opens a list (work-queue) subscription described by `def`.
582    ///
583    /// # Errors
584    ///
585    /// Returns [`RedisError::NotConnected`] when the broker has not connected, or
586    /// [`RedisError::InvalidOptions`] when `def` names a recovery ZSET without a `min_idle`.
587    #[allow(
588        clippy::unused_async,
589        reason = "async for parity with the other subscribe methods and the SubscriptionSource shape"
590    )]
591    pub async fn subscribe_list(&self, def: RedisList) -> Result<RedisListSubscriber, RedisError> {
592        let pool = self.connected()?;
593        let recovery = def.recovery_config()?;
594        Ok(RedisListSubscriber::new(
595            pool,
596            def.key().to_owned(),
597            def.is_reliable(),
598            def.processing_or_default(),
599            def.block_or_default(),
600            def.codec_handle(),
601            def.poison_policy(),
602            recovery,
603        ))
604    }
605
606    /// Returns a Pub/Sub publisher (classic mode by default; override with
607    /// [`RedisPubSubPublisher::mode`]).
608    #[must_use]
609    pub fn pubsub_publisher(&self) -> RedisPubSubPublisher {
610        RedisPubSubPublisher::new(Arc::clone(&self.pool), PubSubMode::Classic)
611    }
612
613    /// Returns a list publisher (`LPUSH`).
614    #[must_use]
615    pub fn list_publisher(&self) -> RedisListPublisher {
616        RedisListPublisher::new(Arc::clone(&self.pool))
617    }
618
619    /// Closes the underlying pool. A no-op if the broker never connected.
620    pub async fn shutdown_pool(&self) {
621        if let Some(pool) = self.pool.get() {
622            let _ = pool.quit().await;
623        }
624    }
625}
626
627/// Creates the consumer group, treating an already-existing group as success.
628async fn ensure_group(
629    pool: &Pool,
630    key: &str,
631    group: &str,
632    start_id: &str,
633) -> Result<(), RedisError> {
634    let result: Result<String, fred::error::Error> =
635        pool.xgroup_create(key, group, start_id, true).await;
636    match result {
637        Ok(_) => Ok(()),
638        // BUSYGROUP: the group already exists, which is the steady-state case.
639        Err(err) if err.details().contains("BUSYGROUP") => Ok(()),
640        Err(err) => Err(RedisError::subscribe(err)),
641    }
642}
643
644impl Broker for RedisBroker {
645    type Error = RedisError;
646
647    async fn connect(&self) -> Result<(), Self::Error> {
648        self.pool
649            .get_or_try_init(|| async {
650                let config = self.build_config()?;
651                let pool = Pool::new(config, None, None, None, self.pool_size)
652                    .map_err(|err| RedisError::Connect(Box::new(err)))?;
653                pool.init()
654                    .await
655                    .map_err(|err| RedisError::Connect(Box::new(err)))?;
656                Ok(pool)
657            })
658            .await?;
659        Ok(())
660    }
661
662    async fn shutdown(&self) -> Result<(), Self::Error> {
663        self.shutdown_pool().await;
664        Ok(())
665    }
666}
667
668// By-name subscription capability for the bare string `#[subscriber("key")]` form. Redis Streams
669// always read through a consumer group, so this requires a broker-wide default group.
670#[allow(clippy::use_self)]
671impl Subscribe for RedisBroker {
672    type Subscriber = RedisSubscriber;
673
674    async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
675        let group = self.default_group.clone().ok_or_else(|| {
676            RedisError::InvalidOptions(format!(
677                "bare-string subscription on `{name}` needs a broker-wide default group: \
678                 call RedisBroker::default_group(name), or subscribe with \
679                 RedisStream::new(name).group(group)"
680            ))
681        })?;
682        RedisBroker::subscribe(self, RedisStream::new(name).group(group)).await
683    }
684}
685
686/// `DescribeServer` reports the configured Redis address (the first seed for cluster/sentinel).
687impl DescribeServer for RedisBroker {
688    fn describe_server(&self) -> ServerSpec {
689        let host = match &self.topology {
690            Topology::Standalone(url) => url
691                .trim_start_matches("rediss://")
692                .trim_start_matches("redis://")
693                .to_owned(),
694            Topology::Cluster(nodes) => nodes.first().cloned().unwrap_or_default(),
695            Topology::Sentinel { hosts, .. } => hosts.first().cloned().unwrap_or_default(),
696            Topology::Preconnected => String::new(),
697        };
698        ServerSpec::new(host, "redis")
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use ruststream::{OutgoingMessage, Publisher};
705
706    use super::*;
707
708    // `standalone` records the address without connecting, so operations needing the connection
709    // fail cleanly until `Broker::connect` runs. No server required.
710    #[tokio::test]
711    async fn standalone_does_not_connect() {
712        let broker = RedisBroker::standalone("redis://127.0.0.1:6379");
713
714        let publish_err = broker
715            .publisher()
716            .publish(OutgoingMessage::new("orders", b"{}".as_slice()))
717            .await
718            .unwrap_err();
719        assert!(matches!(publish_err, RedisError::NotConnected));
720
721        let subscribe_err = broker
722            .subscribe(RedisStream::new("orders").group("g"))
723            .await
724            .unwrap_err();
725        assert!(matches!(subscribe_err, RedisError::NotConnected));
726    }
727
728    #[tokio::test]
729    async fn bare_string_subscription_needs_default_group() {
730        let broker = RedisBroker::standalone("redis://127.0.0.1:6379");
731        let err = Subscribe::subscribe(&broker, "orders").await.unwrap_err();
732        assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("default group")));
733    }
734
735    #[test]
736    fn describe_server_reports_redis() {
737        let broker = RedisBroker::standalone("redis://localhost:6379");
738        let spec = broker.describe_server();
739        assert_eq!(spec.protocol, "redis");
740        assert_eq!(spec.host, "localhost:6379");
741    }
742
743    // Credentials must reach the fred config on every topology, not just the standalone URL.
744    #[test]
745    fn credentials_apply_to_all_topologies() {
746        let brokers = [
747            RedisBroker::standalone("redis://localhost:6379").credentials("alice", "s3cr3t"),
748            RedisBroker::cluster(["127.0.0.1:7000"]).credentials("alice", "s3cr3t"),
749            RedisBroker::sentinel("mymaster", ["127.0.0.1:26379"]).credentials("alice", "s3cr3t"),
750        ];
751        for broker in brokers {
752            let config = broker.build_config().expect("config builds");
753            assert_eq!(config.username.as_deref(), Some("alice"));
754            assert_eq!(config.password.as_deref(), Some("s3cr3t"));
755        }
756    }
757
758    #[test]
759    fn password_only_sets_password_without_username() {
760        let config = RedisBroker::cluster(["127.0.0.1:7000"])
761            .password("requirepass")
762            .build_config()
763            .expect("config builds");
764        assert_eq!(config.username, None);
765        assert_eq!(config.password.as_deref(), Some("requirepass"));
766    }
767
768    // Programmatic credentials win over a standalone URL's userinfo.
769    #[test]
770    fn programmatic_credentials_override_standalone_url() {
771        let config = RedisBroker::standalone("redis://urluser:urlpass@localhost:6379")
772            .credentials("acluser", "aclpass")
773            .build_config()
774            .expect("config builds");
775        assert_eq!(config.username.as_deref(), Some("acluser"));
776        assert_eq!(config.password.as_deref(), Some("aclpass"));
777    }
778
779    // Without an override the URL's credentials are left untouched.
780    #[test]
781    fn url_credentials_preserved_without_override() {
782        let config = RedisBroker::standalone("redis://urluser:urlpass@localhost:6379")
783            .build_config()
784            .expect("config builds");
785        assert_eq!(config.username.as_deref(), Some("urluser"));
786        assert_eq!(config.password.as_deref(), Some("urlpass"));
787    }
788
789    #[test]
790    fn debug_redacts_password() {
791        let broker =
792            RedisBroker::standalone("redis://localhost:6379").credentials("alice", "s3cr3t");
793        let rendered = format!("{broker:?}");
794        assert!(
795            !rendered.contains("s3cr3t"),
796            "password must not appear in Debug output: {rendered}"
797        );
798        // The username is an identifier, not a secret, and is kept for debugging.
799        assert!(
800            rendered.contains("alice"),
801            "expected username in: {rendered}"
802        );
803    }
804
805    #[cfg(feature = "sentinel-auth")]
806    #[test]
807    fn sentinel_credentials_apply_to_sentinel_server() {
808        let config = RedisBroker::sentinel("mymaster", ["127.0.0.1:26379"])
809            .credentials("datauser", "datapass")
810            .sentinel_credentials("sentineluser", "sentinelpass")
811            .build_config()
812            .expect("config builds");
813        // Data-node credentials sit on the top-level config.
814        assert_eq!(config.username.as_deref(), Some("datauser"));
815        let ServerConfig::Sentinel {
816            username, password, ..
817        } = &config.server
818        else {
819            panic!("expected a sentinel server config");
820        };
821        assert_eq!(username.as_deref(), Some("sentineluser"));
822        assert_eq!(password.as_deref(), Some("sentinelpass"));
823    }
824
825    #[cfg(feature = "credential-provider")]
826    #[derive(Debug)]
827    struct StaticCredentials;
828
829    #[cfg(feature = "credential-provider")]
830    #[async_trait::async_trait]
831    impl CredentialProvider for StaticCredentials {
832        async fn fetch(
833            &self,
834            _server: Option<&fred::types::config::Server>,
835        ) -> Result<(Option<String>, Option<String>), fred::error::Error> {
836            Ok((Some("rotating".into()), Some("token".into())))
837        }
838    }
839
840    #[cfg(feature = "credential-provider")]
841    #[test]
842    fn credential_provider_is_applied() {
843        let provider: Arc<dyn CredentialProvider> = Arc::new(StaticCredentials);
844        let config = RedisBroker::cluster(["127.0.0.1:7000"])
845            .credential_provider(provider)
846            .build_config()
847            .expect("config builds");
848        assert!(config.credential_provider.is_some());
849    }
850}