# Backpressure Handling Guide
## Overview
Backpressure is a mechanism to prevent memory exhaustion when publishers send messages faster than subscribers can process them. Without backpressure, unbounded message queues can grow indefinitely, leading to out-of-memory errors and system crashes.
## The Problem
```
Publisher (60 FPS) ──→ Unbounded Queue ──→ Subscriber (10 FPS)
↓
Queue grows
indefinitely!
↓
OOM crash 💥
```
**Scenario:** A camera publishes RGB images at 60 FPS, but your vision processing node can only handle 10 FPS. Without backpressure, the queue grows by 50 messages per second, consuming ~7.5 GB/minute for 1080p images!
## The Solution
Mecha10 provides multiple backpressure strategies through the `Receiver` API:
1. **Bounded channels** - Limit queue size
2. **Drop oldest** - Keep only recent messages
3. **Monitoring** - Track utilization and adapt
## Quick Start
### Default (Unsafe for Fast Publishers)
```rust
// ⚠️ Risk: Unbounded queue can grow without limit
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
```
### Safe (Bounded with Backpressure)
```rust
// ✅ Safe: Queue limited to 50 messages
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_capacity(50);
```
### Real-Time Data (Drop Oldest)
```rust
// ✅ Best for sensors: Always get latest data
let mut lidar = ctx.subscribe(sensor::LIDAR_SCAN)
.await?
.with_drop_oldest(10);
```
## Backpressure Strategies
### 1. Bounded Channel (`.with_capacity(N)`)
**Use when:** You need reliable message delivery with bounded memory.
**Behavior:** Blocks publisher when queue is full (backpressure).
```rust
let mut cmd_queue = ctx.subscribe(robot::COMMANDS)
.await?
.with_capacity(100); // Max 100 commands buffered
while let Some(cmd) = cmd_queue.recv().await {
execute_command(&cmd).await?;
}
```
**Pros:**
- Guaranteed memory bound
- No message loss (if publisher respects backpressure)
- Predictable behavior
**Cons:**
- Can slow down publishers
- May cause deadlocks if not careful
**Best for:**
- Command queues
- Critical messages
- Transactional data
### 2. Drop Oldest (`.with_drop_oldest(N)`)
**Use when:** Recent data is more valuable than old data (real-time systems).
**Behavior:** Drops oldest message when queue is full.
```rust
let mut camera = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_drop_oldest(5); // Keep only 5 most recent images
while let Some(image) = camera.recv().await {
// Always process relatively recent images
detect_objects(&image).await?;
}
```
**Pros:**
- Always get recent data
- No publisher blocking
- Predictable latency
**Cons:**
- Message loss (by design)
- Not suitable for critical data
**Best for:**
- Sensor data (camera, lidar, IMU)
- Real-time visualization
- Telemetry streams
### 3. Unbounded (Default)
**Use when:** Publisher and subscriber rates are balanced.
**Behavior:** No limit on queue size.
```rust
let mut events = ctx.subscribe(system::EVENTS).await?;
// OK if events are infrequent
while let Some(event) = events.recv().await {
log_event(&event);
}
```
**Pros:**
- No complexity
- No backpressure delays
- Simple mental model
**Cons:**
- Risk of OOM
- Unbounded latency growth
- Hard to debug memory issues
**Best for:**
- Low-frequency messages
- Test environments
- Perfectly balanced pub/sub rates
## Monitoring and Metrics
### Check Queue Depth
```rust
let depth = receiver.len();
info!("Current queue depth: {}", depth);
```
### Check Capacity
```rust
if let Some(cap) = receiver.capacity() {
info!("Channel capacity: {}", cap);
} else {
warn!("Unbounded channel - no capacity limit!");
}
```
### Check Utilization
```rust
if let Some(util) = receiver.utilization() {
if util > 0.8 {
warn!("Queue is {:.1}% full - approaching backpressure!", util * 100.0);
}
}
```
### Automatic Monitoring
```rust
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_capacity(100);
// Spawn monitoring task
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let util = images.utilization().unwrap_or(0.0);
// Log metrics
info!("Queue utilization: {:.1}%", util * 100.0);
// Alert on thresholds
if util > 0.95 {
error!("CRITICAL: Queue nearly full!");
} else if util > 0.8 {
warn!("WARNING: High queue utilization");
}
}
});
```
## Adaptive Processing
Adjust processing strategy based on backpressure level:
```rust
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_capacity(100);
while let Some(image) = images.recv().await {
let util = images.utilization().unwrap_or(0.0);
if util > 0.9 {
// High backpressure - use fast mode
process_image_fast(&image)?;
} else if util > 0.5 {
// Moderate - normal mode
process_image_normal(&image)?;
} else {
// Low load - detailed mode
process_image_detailed(&image)?;
}
}
```
## Choosing Capacity Values
### Guidelines
| Camera images (HD) | 5-20 | Drop oldest |
| LiDAR scans | 10-50 | Drop oldest |
| IMU data | 50-200 | Drop oldest |
| Commands | 100-500 | Bounded |
| Events | 50-100 | Bounded |
| Logs | Unbounded or 1000+ | Bounded |
### Calculation
```
Capacity = (Publisher Rate) × (Processing Time) × (Safety Factor)
Example: Camera at 30 FPS, processing takes 50ms
Capacity = 30 msg/s × 0.05s × 2 = 3 messages
Use 5-10 to be safe.
```
### Safety Factor
- **1.0x** - Minimal buffering (risky)
- **2.0x** - Normal buffering (recommended)
- **5.0x** - Large buffering (safe but uses more memory)
- **10.0x+** - Excessive buffering (consider if needed)
## Memory Usage Calculation
```
Memory = Capacity × Message Size
Example: HD images (1920×1080 RGB)
Message Size = 1920 × 1080 × 3 = 6.22 MB
Capacity = 10 images
Memory = 10 × 6.22 MB = 62.2 MB
```
For multiple subscribers:
```
Total Memory = Σ(Capacity_i × Message_Size_i)
```
## Common Patterns
### Pattern 1: Sensor Processing
```rust
// Camera at 60 FPS, we process at 20 FPS
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_drop_oldest(5); // Keep 5 most recent (250ms worth at 20 FPS)
while let Some(image) = images.recv().await {
detect_objects(&image).await?;
}
```
### Pattern 2: Command Queue
```rust
// Commands must not be lost
let mut commands = ctx.subscribe(robot::COMMANDS)
.await?
.with_capacity(200); // Large buffer for reliability
while let Some(cmd) = commands.recv().await {
execute(&cmd).await?;
}
```
### Pattern 3: Multi-Speed Subscribers
```rust
// Fast consumer
let mut fast = ctx.subscribe(sensor::DATA)
.await?
.with_capacity(10);
// Slow consumer
let mut slow = ctx.subscribe(sensor::DATA)
.await?
.with_drop_oldest(50); // Higher capacity + drop old
tokio::spawn(async move {
while let Some(data) = fast.recv().await {
quick_process(&data);
}
});
tokio::spawn(async move {
while let Some(data) = slow.recv().await {
detailed_process(&data).await;
}
});
```
### Pattern 4: Rate Limiting
```rust
let mut high_freq = ctx.subscribe(sensor::IMU)
.await?
.with_drop_oldest(10) // Only keep recent
.throttle(Duration::from_millis(100)); // Rate limit to 10 Hz
while let Some(imu) = high_freq.recv().await {
// Process at max 10 Hz even if IMU publishes at 200 Hz
}
```
## Debugging Backpressure Issues
### Symptom: Out of Memory
**Diagnosis:**
```rust
info!("Queue depth: {}", receiver.len());
info!("Capacity: {:?}", receiver.capacity());
```
**Solution:**
- Add `.with_capacity()` or `.with_drop_oldest()`
- Increase processing speed
- Reduce publisher rate
### Symptom: Stale Data
**Diagnosis:**
```rust
let age = now_micros() - message.header.stamp;
if age > 1_000_000 { // 1 second
warn!("Processing stale data: {}µs old", age);
}
```
**Solution:**
- Use `.with_drop_oldest()` instead of `.with_capacity()`
- Reduce capacity value
- Increase processing speed
### Symptom: Publisher Blocking
**Diagnosis:**
- Check if using `.with_capacity()`
- Monitor utilization (should be < 100%)
**Solution:**
- Increase capacity
- Use `.with_drop_oldest()` if message loss is acceptable
- Add more subscriber instances (scale horizontally)
## Best Practices
1. **Always use backpressure for high-frequency data**
```rust
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_drop_oldest(10);
```
2. **Monitor utilization in production**
```rust
if let Some(util) = receiver.utilization() {
metrics::gauge!("queue.utilization", util);
}
```
3. **Choose the right strategy**
- Sensor data → `.with_drop_oldest()`
- Commands → `.with_capacity()`
- Logs → Unbounded or large `.with_capacity()`
4. **Calculate capacity based on rates**
```
Capacity = (Publish Rate) × (Process Time) × (Safety Factor)
```
5. **Test under load**
- Simulate fast publishers
- Monitor memory usage
- Verify message loss is acceptable
6. **Document your choices**
```rust
let mut camera = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_drop_oldest(5);
```
## Migration Guide
### From Unbounded to Bounded
**Before:**
```rust
let mut receiver = ctx.subscribe(topic).await?;
```
**After:**
```rust
let mut receiver = ctx.subscribe(topic)
.await?
.with_capacity(50); // Add backpressure
```
**Testing:**
1. Start with large capacity (e.g., 1000)
2. Monitor utilization under load
3. Reduce capacity until utilization reaches 70-80%
4. Add 2x safety factor
## Performance Impact
| Unbounded | Unbounded | Low | Variable | High |
| Bounded | O(N) | Low | Bounded | Limited |
| Drop Oldest | O(N) | Low | Bounded | High |
## See Also
- [Examples](./examples/backpressure_usage.rs) - Complete examples
- [Receiver API](./src/context/receiver.rs) - Implementation details
- [Stream Combinators](./src/stream.rs) - Throttle, debounce, window