# @mecha10/messaging
Redis Streams-based pub/sub messaging system for inter-node communication in the Mecha10 framework.
## Features
- ✅ **Redis Streams Backend** - Reliable message delivery with persistence
- ✅ **Type-Safe Messaging** - Generic types for compile-time safety
- ✅ **Consumer Groups** - Load balancing across multiple nodes
- ✅ **Message Acknowledgment** - `ack()`/`nack()` for retry logic
- ✅ **Auto-Reconnection** - Automatic connection recovery
- ✅ **Resilient Subscriptions** - Automatic retry with exponential backoff
- ✅ **Self-Healing** - Auto-recovery from consumer group failures
- ✅ **Namespace Support** - Multi-robot fleet isolation
- ✅ **Wildcard Subscriptions** - Subscribe to multiple topics with patterns (e.g., `*/camera/rgb`)
## Installation
```toml
[dependencies]
mecha10-messaging = { path = "../messaging" }
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
```
## Quick Start
### Publisher
```rust
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct LaserScan {
ranges: Vec<f32>,
timestamp: u64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "lidar-node").await?;
loop {
let scan = LaserScan {
ranges: vec![1.0, 2.0, 3.0, 4.0],
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64,
};
bus.publish("/scan", &scan).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
```
### Subscriber
```rust
use mecha10_messaging::MessageBus;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "slam-node").await?;
// Subscribe with consumer group
let mut rx = bus.subscribe::<LaserScan>("/scan", "slam-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received scan from {}: {} points",
msg.publisher, msg.payload.ranges.len());
// Process the scan...
// Acknowledge successful processing
msg.ack().await?;
}
Ok(())
}
```
## Core Concepts
### Topics
Topics are named channels for messages, following a hierarchical naming convention:
```
/scan # LiDAR scans
/camera/rgb # RGB camera frames
/camera/depth # Depth camera frames
/odom # Odometry
/cmd_vel # Velocity commands
```
### Consumer Groups
Consumer groups enable **load balancing** - each message is delivered to only one consumer in the group:
```rust
// Node 1
let mut rx1 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Node 2
let mut rx2 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Each task goes to either Node 1 OR Node 2 (not both)
```
Different groups receive **all** messages:
```rust
// SLAM node
let mut rx_slam = bus.subscribe::<Scan>("/scan", "slam-group").await?;
// Logger node
let mut rx_log = bus.subscribe::<Scan>("/scan", "logger-group").await?;
// Both receive the same scans
```
### Message Acknowledgment
Messages must be acknowledged after processing:
```rust
while let Some(msg) = rx.recv().await {
match process_message(&msg.payload) {
Ok(_) => {
msg.ack().await?; // Mark as successfully processed
}
Err(e) => {
eprintln!("Processing failed: {}", e);
msg.nack().await?; // Will be redelivered
}
}
}
```
### Namespaces
Isolate multiple robots using the same Redis instance:
```rust
let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
bus.set_namespace("fleet-alpha");
// Messages published to "fleet-alpha:/scan"
bus.publish("/scan", &scan_data).await?;
```
## API Reference
### `MessageBus`
**Connect:**
```rust
let mut bus = MessageBus::connect(redis_url, node_id).await?;
```
**Set namespace:**
```rust
bus.set_namespace("my-fleet");
```
**Publish:**
```rust
bus.publish(topic, &payload).await?;
```
**Subscribe:**
```rust
let mut rx = bus.subscribe::<T>(topic, consumer_group).await?;
```
**Subscribe with wildcard pattern:**
```rust
let mut rx = bus.subscribe_pattern::<T>(pattern, consumer_group).await?;
```
**Discover topics:**
```rust
let topics = bus.discover_topics(redis_pattern).await?;
```
**Close:**
```rust
bus.close().await?;
```
### `Message<T>`
**Fields:**
```rust
pub struct Message<T> {
pub id: String, // Redis Stream ID
pub topic: String, // Topic name
pub publisher: String, // Publisher node ID
pub timestamp: u64, // Unix timestamp (ms)
pub payload: T, // Your data
}
```
**Methods:**
```rust
msg.ack().await?; // Acknowledge
msg.nack().await?; // Negative acknowledgment (retry)
```
### `Subscriber<T>`
**Receive message:**
```rust
let msg = rx.recv().await;
```
**Get topic:**
```rust
let topic = rx.topic();
```
## Advanced Usage
### Wildcard Subscriptions
Subscribe to multiple topics at once using wildcard patterns. This is essential for remote nodes that aggregate data from multiple robots:
```rust
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct CameraFrame {
data: Vec<u8>,
width: u32,
height: u32,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a vision processing node that receives camera feeds from ALL robots
let mut bus = MessageBus::connect("redis://localhost:6379", "vision-processor").await?;
// Subscribe to camera/rgb topic from all robots using wildcard pattern
let mut rx = bus.subscribe_pattern::<CameraFrame>("*/camera/rgb", "vision-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received frame from robot: {} (publisher: {})",
msg.topic, msg.publisher);
// Process the frame (e.g., run YOLO detection)
process_frame(&msg.payload).await?;
msg.ack().await?;
}
Ok(())
}
```
**Pattern Syntax:**
- `*/camera/rgb` - Matches `robot-1/camera/rgb`, `robot-2/camera/rgb`, etc.
- `robot-*/scan` - Matches `robot-1/scan`, `robot-alpha/scan`, etc.
- `*/sensor/*` - Matches any topic with "sensor" in the middle segment
**Use Cases:**
- **Centralized Vision Processing**: One node processes camera feeds from entire fleet
- **Fleet-wide Logging**: Single logger consumes logs from all robots
- **Cross-Robot Monitoring**: Dashboard aggregates metrics from multiple robots
- **Multi-Robot Coordination**: Coordinator receives state updates from all robots
**Load Balancing with Wildcards:**
```rust
// Multiple vision processors share the workload
// Node 1
let mut rx1 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Node 2
let mut rx2 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Messages from all robots are load-balanced across Node 1 and Node 2
```
### Multiple Subscribers
```rust
let mut bus = MessageBus::connect("redis://localhost:6379", "multi-node").await?;
// Subscribe to multiple topics
let mut rx_scan = bus.subscribe::<LaserScan>("/scan", "processing").await?;
let mut rx_odom = bus.subscribe::<Odometry>("/odom", "processing").await?;
let mut rx_cmd = bus.subscribe::<Twist>("/cmd_vel", "execution").await?;
// Use tokio::select! to handle multiple streams
loop {
tokio::select! {
Some(msg) = rx_scan.recv() => {
process_scan(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_odom.recv() => {
process_odom(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_cmd.recv() => {
process_cmd(&msg.payload).await?;
msg.ack().await?;
}
}
}
```
### Custom Message Types
```rust
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct RobotState {
position: (f32, f32, f32),
velocity: (f32, f32, f32),
battery: f32,
mode: String,
}
let state = RobotState {
position: (1.0, 2.0, 0.5),
velocity: (0.5, 0.0, 0.1),
battery: 87.5,
mode: "autonomous".to_string(),
};
bus.publish("/state", &state).await?;
```
## Testing
### Unit Tests
```bash
cargo test
```
### Integration Tests (Requires Redis)
Start Redis:
```bash
docker run -d -p 6379:6379 redis:latest
```
Run tests:
```bash
cargo test -- --ignored
```
**Tests included:**
- `test_pub_sub()` - Basic publish/subscribe
- `test_multiple_subscribers()` - Consumer group behavior
## Error Handling
```rust
use mecha10_messaging::MessagingError;
match bus.publish("/scan", &data).await {
Ok(_) => println!("Published"),
Err(MessagingError::Redis(e)) => eprintln!("Redis error: {}", e),
Err(MessagingError::Serialization(e)) => eprintln!("Serialization error: {}", e),
Err(MessagingError::Connection(msg)) => eprintln!("Connection error: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}
```
## Performance Tips
1. **Batch Publishing** - Publish multiple messages in quick succession
2. **Consumer Groups** - Scale processing by adding more consumers
3. **Acknowledgment** - Always ack messages to avoid redelivery
4. **Namespace** - Isolate fleets to reduce cross-talk
## Resilience & Error Recovery
The messaging system includes built-in resilience features to handle transient failures automatically:
### Subscription Retry Logic
When creating a subscription, the framework automatically retries on failure with exponential backoff:
- **Up to 5 retry attempts** with delays: 100ms, 200ms, 400ms, 800ms, 1600ms
- **Handles "BUSYGROUP" gracefully** - recognizes when consumer group already exists
- **Detailed logging** - warns on each retry attempt with error details
- **Fail-fast with clear errors** - returns error after all retries exhausted
```rust
// Automatic retry - no code changes needed!
let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
// ⚠️ If consumer group creation fails, automatically retries up to 5 times
// ✅ Succeeds on first retry that works
// ❌ Returns error after all retries fail
```
**What you'll see in logs:**
```
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 1/5): Connection reset. Retrying in 100ms...
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 2/5): Connection reset. Retrying in 200ms...
✅ Created consumer group 'slam--scan' for topic 'mecha10:/scan'
```
### Self-Healing Subscriptions
The background subscription task automatically recovers from consumer group failures:
- **Detects "NOGROUP" errors** - recognizes when consumer group has been deleted
- **Auto-recreates consumer groups** - attempts to recreate missing groups
- **Continues retrying** - keeps trying to reconnect every 1 second
- **No manual intervention** - subscriptions heal themselves
```rust
// Your subscription keeps working even if:
// - Redis gets flushed (FLUSHALL)
// - Consumer group gets manually deleted
// - Redis restarts
while let Some(msg) = rx.recv().await {
process(&msg.payload).await?;
msg.ack().await?;
}
// Background task automatically recovers and continues delivering messages
```
**What you'll see in logs:**
```
❌ XREAD failed for topic 'mecha10:/scan' group 'slam--scan': NOGROUP No such consumer group
🔄 Consumer group 'slam--scan' missing, attempting to recreate...
✅ Recreated consumer group 'slam--scan' for topic 'mecha10:/scan'
```
### Best Practices
1. **Monitor logs** - Watch for retry warnings to identify infrastructure issues
2. **Set proper timeouts** - Ensure your application can tolerate brief subscription delays
3. **Handle subscription errors** - Catch errors on `subscribe()` to handle permanent failures
4. **Don't manually delete consumer groups** - Let the framework manage them
### Debugging Subscription Issues
If subscriptions fail even with retries, check:
1. **Redis connectivity**: `redis-cli -h <host> -p <port> ping`
2. **Redis version**: Requires Redis 5.0+ for Streams support
3. **Redis memory**: Ensure sufficient memory for consumer groups
4. **Permissions**: Check Redis ACLs if using authentication
Use these Redis commands to inspect subscriptions:
```bash
# List all consumer groups for a topic
redis-cli XINFO GROUPS "mecha10:/scan"
# Check pending messages in a group
redis-cli XPENDING "mecha10:/scan" "slam--scan"
# List consumers in a group
redis-cli XINFO CONSUMERS "mecha10:/scan" "slam--scan"
```
## Architecture
```
┌─────────────┐
│ Node A │
│ (Publisher) │
└──────┬──────┘
│ publish("/scan")
▼
┌─────────────────────┐
│ Redis Streams │
│ Topic: /scan │
└──────┬──────┬───────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Node B │ │ Node C │
│ (SLAM) │ │ (Logger) │
└──────────┘ └──────────┘
```
## Examples
See [QUICKSTART.md](../../QUICKSTART.md) for complete examples.
## License
MIT