Skip to main content

RedisBroker

Struct RedisBroker 

Source
pub struct RedisBroker { /* private fields */ }
Expand description

A Redis broker handle backed by a fred connection Pool.

Construct it synchronously with RedisBroker::standalone and let the runtime connect it at startup, or eagerly with RedisBroker::connect / RedisBroker::from_pool. The handle is cheap to clone, and clones share one pool. Subscriptions are opened through RedisBroker::subscribe with a RedisStream descriptor.

§Lazy connection

standalone performs no I/O: it only records the server address. The pool is opened by Broker::connect, which the runtime calls once at startup, so a Redis service can be built with the synchronous #[ruststream::app] macro. Publishers handed out before connect resolve the shared pool on first use; operations that need it before connect return RedisError::NotConnected.

§Examples

use ruststream_fred::{RedisBroker, RedisStream};

let broker = RedisBroker::connect("redis://localhost:6379").await?;
let publisher = broker.publisher();
let sub = broker.subscribe(RedisStream::new("orders").group("workers")).await?;
broker.shutdown_pool().await;

Implementations§

Source§

impl RedisBroker

Source

pub fn standalone(url: impl Into<String>) -> Self

Creates a standalone-topology broker that connects to url when Broker::connect runs.

Synchronous and performs no I/O, so it slots into the #[ruststream::app] builder; the connection is opened lazily at startup. See the type docs.

Source

pub fn cluster(nodes: impl IntoIterator<Item = impl Into<String>>) -> Self

Creates a Redis Cluster broker from one or more host:port seed nodes.

Only one reachable node is needed; fred discovers the rest of the cluster on connect. Synchronous and performs no I/O. See the type docs.

Source

pub fn sentinel( service: impl Into<String>, sentinels: impl IntoIterator<Item = impl Into<String>>, ) -> Self

Creates a Sentinel-backed broker that tracks the primary named service, discovering it through the given sentinel host:port addresses.

Synchronous and performs no I/O. See the type docs.

Source

pub const fn pool(self, size: usize) -> Self

Sets the connection-pool size. Defaults to 4.

Source

pub fn default_group(self, group: impl Into<String>) -> Self

Sets a broker-wide default consumer group, enabling the bare-string #[subscriber("key")] form (Redis Streams always read through a group). Without it a bare-string subscription returns RedisError::InvalidOptions; name the group per subscription with RedisStream::group instead.

Source

pub fn credentials( self, username: impl Into<String>, password: impl Into<String>, ) -> Self

Sets the ACL username and password used to authenticate on connect, applied on every topology (standalone, cluster, sentinel).

This maps onto fred’s Config.username / Config.password, so authentication works beyond the standalone redis://user:pass@host URL, which the bare cluster / sentinel seed lists cannot express. Credentials set here override any in a standalone URL.

For a password-only AUTH (the legacy requirepass, no ACL user) use password.

§Examples
use ruststream_fred::RedisBroker;

let broker = RedisBroker::cluster(["10.0.0.1:6379"]).credentials("worker", "s3cr3t");
Source

pub fn password(self, password: impl Into<String>) -> Self

Sets a password-only AUTH (no ACL username; the legacy requirepass form), on every topology. Use credentials for an ACL user plus password.

§Examples
use ruststream_fred::RedisBroker;

let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"]).password("s3cr3t");
Source

pub fn tls(self, tls: impl Into<TlsConfig>) -> Self

Sets the TLS configuration used on connect, on every topology. Accepts a fred TlsConfig or anything convertible into one (for example a TlsConnector).

Available behind the tls-rustls, tls-rustls-ring, or tls-native-tls feature; a standalone broker can also enable TLS through a rediss:// / valkeys:// URL. The fred re-exports TlsConfig / TlsConnector provide default_rustls() / default_native_tls() shorthands for system-trust setups.

§Examples
use ruststream_fred::{RedisBroker, TlsConfig};

fn build(tls: TlsConfig) -> RedisBroker {
    RedisBroker::cluster(["10.0.0.1:6379"]).tls(tls)
}
Source

pub fn sentinel_credentials( self, username: impl Into<String>, password: impl Into<String>, ) -> Self

Sets distinct credentials for authenticating to the sentinel nodes, separate from the data-node credentials. Only meaningful on the sentinel topology.

Available behind the sentinel-auth feature.

§Examples
use ruststream_fred::RedisBroker;

let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"])
    .credentials("worker", "data-pass")
    .sentinel_credentials("sentinel-user", "sentinel-pass");
Source

pub fn sentinel_password(self, password: impl Into<String>) -> Self

Sets a password-only credential for authenticating to the sentinel nodes. Use sentinel_credentials for an ACL user plus password.

Available behind the sentinel-auth feature.

§Examples
use ruststream_fred::RedisBroker;

let broker = RedisBroker::sentinel("mymaster", ["10.0.0.1:26379"])
    .sentinel_password("sentinel-pass");
Source

pub fn credential_provider(self, provider: Arc<dyn CredentialProvider>) -> Self

Sets a dynamic credential provider that supplies (and can rotate) the username/password on each AUTH / HELLO, for IAM-style auth. Takes precedence over static credentials.

Available behind the credential-provider feature.

§Examples
use std::sync::Arc;
use ruststream_fred::{CredentialProvider, RedisBroker};

fn build(provider: Arc<dyn CredentialProvider>) -> RedisBroker {
    RedisBroker::standalone("redis://localhost:6379").credential_provider(provider)
}
Source

pub async fn connect(url: impl Into<String>) -> Result<Self, RedisError>

Connects to a standalone Redis server eagerly, returning an already-connected broker.

§Errors

Returns RedisError::Connect when the URL is invalid or the connection cannot be established.

Source

pub fn from_pool(pool: Pool) -> Self

Wraps an already-connected fred pool. Useful for advanced configuration (TLS, cluster, sentinel, custom performance and reconnection policies).

Source

pub fn pool_handle(&self) -> Pool

Returns a clone of the underlying pool. Useful for advanced operations not covered by the wrapper.

§Panics

Panics if the broker has not connected yet (built with standalone and Broker::connect not run). Call it after startup, or build with connect / from_pool.

Source

pub async fn subscribe( &self, def: RedisStream, ) -> Result<RedisSubscriber, RedisError>

Opens a stream subscription described by def.

Ensures the consumer group exists (XGROUP CREATE ... MKSTREAM, ignoring an already-existing group) before returning the subscriber.

§Errors

Returns RedisError::NotConnected when the broker has not connected, RedisError::InvalidOptions when def names no consumer group, or RedisError::Subscribe when the group cannot be created.

Source

pub fn publisher(&self) -> RedisPublisher

Returns a publisher bound to this broker.

It may be created before Broker::connect (for example inside the with_broker builder); it resolves the shared pool when it first publishes.

Source

pub async fn subscribe_pubsub( &self, def: RedisPubSub, ) -> Result<RedisPubSubSubscriber, RedisError>

Opens a Pub/Sub subscription described by def on a dedicated client.

§Errors

Returns RedisError::InvalidOptions for an invalid mode/pattern combination, RedisError::Connect when the dedicated client cannot connect, or RedisError::Subscribe when the subscribe command fails.

Source

pub async fn subscribe_list( &self, def: RedisList, ) -> Result<RedisListSubscriber, RedisError>

Opens a list (work-queue) subscription described by def.

§Errors

Returns RedisError::NotConnected when the broker has not connected, or RedisError::InvalidOptions when def names a recovery ZSET without a min_idle.

Source

pub fn pubsub_publisher(&self) -> RedisPubSubPublisher

Returns a Pub/Sub publisher (classic mode by default; override with RedisPubSubPublisher::mode).

Source

pub fn list_publisher(&self) -> RedisListPublisher

Returns a list publisher (LPUSH).

Source

pub async fn shutdown_pool(&self)

Closes the underlying pool. A no-op if the broker never connected.

Trait Implementations§

Source§

impl Broker for RedisBroker

Source§

type Error = RedisError

The error type returned by broker-level operations.
Source§

async fn connect(&self) -> Result<(), Self::Error>

Establishes the connection to the broker. Idempotent: calling multiple times must not open additional sockets. Read more
Source§

async fn shutdown(&self) -> Result<(), Self::Error>

Closes the broker connection, flushing in-flight publishes and stopping background tasks. Read more
Source§

impl Clone for RedisBroker

Source§

fn clone(&self) -> RedisBroker

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for RedisBroker

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl DescribeServer for RedisBroker

DescribeServer reports the configured Redis address (the first seed for cluster/sentinel).

Source§

fn describe_server(&self) -> ServerSpec

Returns the server coordinates for this broker.
Source§

impl Subscribe for RedisBroker

Source§

type Subscriber = RedisSubscriber

The subscriber type opened by a by-name subscription.
Source§

async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error>

Opens a subscription to name, producing this broker’s Subscriber. Read more
Source§

impl SubscriptionSource<RedisBroker> for RedisList

Source§

type Subscriber = RedisListSubscriber

The subscriber type this source opens.
Source§

fn name(&self) -> &str

The name (subject / channel) this subscription binds to. Read more
Source§

async fn subscribe( self, broker: &RedisBroker, ) -> Result<Self::Subscriber, RedisError>

Opens the subscription against broker. Called once, after Broker::connect. Read more
Source§

impl SubscriptionSource<RedisBroker> for RedisPubSub

Source§

type Subscriber = RedisPubSubSubscriber

The subscriber type this source opens.
Source§

fn name(&self) -> &str

The name (subject / channel) this subscription binds to. Read more
Source§

async fn subscribe( self, broker: &RedisBroker, ) -> Result<Self::Subscriber, RedisError>

Opens the subscription against broker. Called once, after Broker::connect. Read more
Source§

impl SubscriptionSource<RedisBroker> for RedisStream

Source§

type Subscriber = RedisSubscriber

The subscriber type this source opens.
Source§

fn name(&self) -> &str

The name (subject / channel) this subscription binds to. Read more
Source§

async fn subscribe( self, broker: &RedisBroker, ) -> Result<Self::Subscriber, RedisError>

Opens the subscription against broker. Called once, after Broker::connect. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more