pub struct MessageBus { /* private fields */ }Expand description
Message bus for pub/sub communication
Implementations§
Source§impl MessageBus
impl MessageBus
Sourcepub async fn connect(redis_url: &str, node_id: &str) -> Result<Self>
pub async fn connect(redis_url: &str, node_id: &str) -> Result<Self>
Connect to Redis and create a new message bus
Uses exponential backoff retry logic for resilient connection establishment.
§Arguments
redis_url- Redis connection URL (e.g., “redis://localhost:6379”)node_id- Unique identifier for this node
§Example
let bus = MessageBus::connect("redis://localhost:6379", "camera-node").await?;Sourcepub async fn connect_with_retry(
redis_url: &str,
node_id: &str,
retry_config: RetryConfig,
) -> Result<Self>
pub async fn connect_with_retry( redis_url: &str, node_id: &str, retry_config: RetryConfig, ) -> Result<Self>
Connect to Redis with custom retry configuration
§Arguments
redis_url- Redis connection URLnode_id- Unique identifier for this noderetry_config- Retry configuration for exponential backoff
§Example
let retry_config = RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(30),
multiplier: 2.0,
};
let bus = MessageBus::connect_with_retry(
"redis://localhost:6379",
"camera-node",
retry_config
).await?;Sourcepub fn set_namespace(&mut self, namespace: &str)
pub fn set_namespace(&mut self, namespace: &str)
Set the namespace for topic isolation
Namespaces allow multiple robots/fleets to use the same Redis instance without topic collisions.
Sourcepub async fn subscribe<T: DeserializeOwned + Send + 'static>(
&mut self,
topic: &str,
consumer_group: &str,
) -> Result<Subscriber<T>>
pub async fn subscribe<T: DeserializeOwned + Send + 'static>( &mut self, topic: &str, consumer_group: &str, ) -> Result<Subscriber<T>>
Subscribe to a topic with a consumer group
Consumer groups allow multiple nodes to process messages in parallel, with each message delivered to only one consumer in the group.
§Arguments
topic- Topic name (e.g., “/scan”, “/odom”)consumer_group- Consumer group name (e.g., “processing”, “logging”)
§Example
let mut rx = bus.subscribe::<LaserScan>("/scan", "slam").await?;
while let Some(msg) = rx.recv().await {
println!("Received: {:?}", msg.payload);
msg.ack().await?;
}Sourcepub async fn subscribe_pattern<T: DeserializeOwned + Send + 'static>(
&mut self,
pattern: &str,
consumer_group: &str,
) -> Result<Subscriber<T>>
pub async fn subscribe_pattern<T: DeserializeOwned + Send + 'static>( &mut self, pattern: &str, consumer_group: &str, ) -> Result<Subscriber<T>>
Subscribe to multiple topics matching a wildcard pattern
Enables aggregated subscriptions across multiple topics using wildcard patterns.
Useful for collecting data from multiple robots (e.g., */camera/rgb subscribes
to camera feeds from all robots).
§Arguments
pattern- Wildcard pattern with*as placeholder (e.g., “/camera/rgb”, “robot-/scan”)consumer_group- Consumer group name for load balancing
§Pattern Syntax
*matches any segment in the topic path*/camera/rgbmatches “robot-1/camera/rgb”, “robot-2/camera/rgb”, etc.robot-*/scanmatches “robot-1/scan”, “robot-alpha/scan”, etc.
§Example
// Subscribe to camera feeds from ALL robots
let mut rx = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-processing").await?;
while let Some(msg) = rx.recv().await {
println!("Received frame from robot: {}", msg.topic);
// Process frame from any robot
msg.ack().await?;
}§Implementation
This method:
- Converts wildcard pattern to Redis KEYS pattern
- Discovers all matching topics using
discover_topics() - Subscribes to each discovered topic with the same consumer group
- Multiplexes messages from all topics into a single receiver
§Note
This performs initial topic discovery using Redis KEYS command. For dynamic
topic discovery (new topics appearing after subscription), consider polling
discover_topics() periodically and creating new subscriptions.
Sourcepub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>>
pub async fn discover_topics(&self, pattern: &str) -> Result<Vec<String>>
Discover topics matching a Redis KEYS pattern
Uses Redis KEYS command to find all streams matching the given pattern. This is useful for aggregated topic subscriptions.
§Arguments
pattern- Redis KEYS pattern (e.g., “mecha10:topic:/sensor/camera/*”)
§Returns
A vector of topic paths (without the namespace prefix)
§Example
// Discover all camera topics
let topics = bus.discover_topics("mecha10:topic:/sensor/camera/*").await?;
println!("Found {} camera topics", topics.len());§Note
This uses Redis KEYS command which can be slow on large datasets. For production use with many topics, consider using SCAN instead.
Auto Trait Implementations§
impl Freeze for MessageBus
impl !RefUnwindSafe for MessageBus
impl Send for MessageBus
impl Sync for MessageBus
impl Unpin for MessageBus
impl !UnwindSafe for MessageBus
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more