@mecha10/messaging
Redis Streams-based pub/sub messaging system for inter-node communication in the Mecha10 framework.
Features
- ✅ Redis Streams Backend - Reliable message delivery with persistence
- ✅ Type-Safe Messaging - Generic types for compile-time safety
- ✅ Consumer Groups - Load balancing across multiple nodes
- ✅ Message Acknowledgment -
ack()/nack()for retry logic - ✅ Auto-Reconnection - Automatic connection recovery
- ✅ Resilient Subscriptions - Automatic retry with exponential backoff
- ✅ Self-Healing - Auto-recovery from consumer group failures
- ✅ Namespace Support - Multi-robot fleet isolation
- ✅ Wildcard Subscriptions - Subscribe to multiple topics with patterns (e.g.,
*/camera/rgb)
Installation
[]
= { = "../messaging" }
= { = "1.35", = ["full"] }
= { = "1.0", = ["derive"] }
Quick Start
Publisher
use MessageBus;
use ;
async
Subscriber
use MessageBus;
async
Core Concepts
Topics
Topics are named channels for messages, following a hierarchical naming convention:
/scan # LiDAR scans
/camera/rgb # RGB camera frames
/camera/depth # Depth camera frames
/odom # Odometry
/cmd_vel # Velocity commands
Consumer Groups
Consumer groups enable load balancing - each message is delivered to only one consumer in the group:
// Node 1
let mut rx1 = bus..await?;
// Node 2
let mut rx2 = bus..await?;
// Each task goes to either Node 1 OR Node 2 (not both)
Different groups receive all messages:
// SLAM node
let mut rx_slam = bus..await?;
// Logger node
let mut rx_log = bus..await?;
// Both receive the same scans
Message Acknowledgment
Messages must be acknowledged after processing:
while let Some = rx.recv.await
Namespaces
Isolate multiple robots using the same Redis instance:
let mut bus = connect.await?;
bus.set_namespace;
// Messages published to "fleet-alpha:/scan"
bus.publish.await?;
API Reference
MessageBus
Connect:
let mut bus = connect.await?;
Set namespace:
bus.set_namespace;
Publish:
bus.publish.await?;
Subscribe:
let mut rx = bus..await?;
Subscribe with wildcard pattern:
let mut rx = bus..await?;
Discover topics:
let topics = bus.discover_topics.await?;
Close:
bus.close.await?;
Message<T>
Fields:
Methods:
msg.ack.await?; // Acknowledge
msg.nack.await?; // Negative acknowledgment (retry)
Subscriber<T>
Receive message:
let msg = rx.recv.await;
Get topic:
let topic = rx.topic;
Advanced Usage
Wildcard Subscriptions
Subscribe to multiple topics at once using wildcard patterns. This is essential for remote nodes that aggregate data from multiple robots:
use MessageBus;
use ;
async
Pattern Syntax:
*/camera/rgb- Matchesrobot-1/camera/rgb,robot-2/camera/rgb, etc.robot-*/scan- Matchesrobot-1/scan,robot-alpha/scan, etc.*/sensor/*- Matches any topic with "sensor" in the middle segment
Use Cases:
- Centralized Vision Processing: One node processes camera feeds from entire fleet
- Fleet-wide Logging: Single logger consumes logs from all robots
- Cross-Robot Monitoring: Dashboard aggregates metrics from multiple robots
- Multi-Robot Coordination: Coordinator receives state updates from all robots
Load Balancing with Wildcards:
// Multiple vision processors share the workload
// Node 1
let mut rx1 = bus..await?;
// Node 2
let mut rx2 = bus..await?;
// Messages from all robots are load-balanced across Node 1 and Node 2
Multiple Subscribers
let mut bus = connect.await?;
// Subscribe to multiple topics
let mut rx_scan = bus..await?;
let mut rx_odom = bus..await?;
let mut rx_cmd = bus..await?;
// Use tokio::select! to handle multiple streams
loop
Custom Message Types
use ;
let state = RobotState ;
bus.publish.await?;
Testing
Unit Tests
Integration Tests (Requires Redis)
Start Redis:
Run tests:
Tests included:
test_pub_sub()- Basic publish/subscribetest_multiple_subscribers()- Consumer group behavior
Error Handling
use MessagingError;
match bus.publish.await
Performance Tips
- Batch Publishing - Publish multiple messages in quick succession
- Consumer Groups - Scale processing by adding more consumers
- Acknowledgment - Always ack messages to avoid redelivery
- Namespace - Isolate fleets to reduce cross-talk
Resilience & Error Recovery
The messaging system includes built-in resilience features to handle transient failures automatically:
Subscription Retry Logic
When creating a subscription, the framework automatically retries on failure with exponential backoff:
- Up to 5 retry attempts with delays: 100ms, 200ms, 400ms, 800ms, 1600ms
- Handles "BUSYGROUP" gracefully - recognizes when consumer group already exists
- Detailed logging - warns on each retry attempt with error details
- Fail-fast with clear errors - returns error after all retries exhausted
// Automatic retry - no code changes needed!
let mut rx = bus..await?;
// ⚠️ If consumer group creation fails, automatically retries up to 5 times
// ✅ Succeeds on first retry that works
// ❌ Returns error after all retries fail
What you'll see in logs:
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 1/5): Connection reset. Retrying in 100ms...
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 2/5): Connection reset. Retrying in 200ms...
✅ Created consumer group 'slam--scan' for topic 'mecha10:/scan'
Self-Healing Subscriptions
The background subscription task automatically recovers from consumer group failures:
- Detects "NOGROUP" errors - recognizes when consumer group has been deleted
- Auto-recreates consumer groups - attempts to recreate missing groups
- Continues retrying - keeps trying to reconnect every 1 second
- No manual intervention - subscriptions heal themselves
// Your subscription keeps working even if:
// - Redis gets flushed (FLUSHALL)
// - Consumer group gets manually deleted
// - Redis restarts
while let Some = rx.recv.await
// Background task automatically recovers and continues delivering messages
What you'll see in logs:
❌ XREAD failed for topic 'mecha10:/scan' group 'slam--scan': NOGROUP No such consumer group
🔄 Consumer group 'slam--scan' missing, attempting to recreate...
✅ Recreated consumer group 'slam--scan' for topic 'mecha10:/scan'
Best Practices
- Monitor logs - Watch for retry warnings to identify infrastructure issues
- Set proper timeouts - Ensure your application can tolerate brief subscription delays
- Handle subscription errors - Catch errors on
subscribe()to handle permanent failures - Don't manually delete consumer groups - Let the framework manage them
Debugging Subscription Issues
If subscriptions fail even with retries, check:
- Redis connectivity:
redis-cli -h <host> -p <port> ping - Redis version: Requires Redis 5.0+ for Streams support
- Redis memory: Ensure sufficient memory for consumer groups
- Permissions: Check Redis ACLs if using authentication
Use these Redis commands to inspect subscriptions:
# List all consumer groups for a topic
# Check pending messages in a group
# List consumers in a group
Architecture
┌─────────────┐
│ Node A │
│ (Publisher) │
└──────┬──────┘
│ publish("/scan")
▼
┌─────────────────────┐
│ Redis Streams │
│ Topic: /scan │
└──────┬──────┬───────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Node B │ │ Node C │
│ (SLAM) │ │ (Logger) │
└──────────┘ └──────────┘
Examples
See QUICKSTART.md for complete examples.
License
MIT