# Cluster Example
This chapter demonstrates building a distributed RPC cluster with automatic worker discovery, load balancing, and failure detection using RpcNet's built-in cluster features.
## Architecture Overview
The cluster example showcases three main components working together:
```
┌──────────────────────────┐
│ Director │
│ (Coordinator Node) │
│ │
│ - WorkerRegistry │
│ - ClusterClient │
│ - Load Balancing │
└────────┬─────────────────┘
│
Gossip Protocol (SWIM)
│
┌────────────────┼────────────────┐
│ │
┌───────▼────────┐ ┌────────▼───────┐
│ Worker A │ │ Worker B │
│ │ │ │
│ - Auto-join │ │ - Auto-join │
│ - Tag: worker │ │ - Tag: worker │
│ - Process tasks│ │ - Process tasks│
└─────────────────┘ └─────────────────┘
```
### Components
**1. Director** - Coordinator node that:
- Uses `WorkerRegistry` for automatic worker discovery
- Uses `ClusterClient` for load-balanced request routing
- Employs `LeastConnections` strategy by default
- Monitors worker pool status
- Routes client requests to healthy workers
**2. Workers** - Processing nodes that:
- Join cluster automatically via gossip protocol
- Tag themselves with `role=worker` for discovery
- Process compute tasks from clients
- Monitor cluster events (node joined/left/failed)
- Support simulated failures for testing
**3. Client** - Application that:
- Connects to director
- Gets worker assignment
- Establishes direct connection to worker
- Handles failover automatically
## Why Use Built-in Cluster Features?
Compared to manual worker management patterns:
**Manual Approach** ❌:
- Custom `HashMap<Uuid, WorkerInfo>` for tracking
- Manual round-robin selection logic
- Explicit RPC calls for worker registration
- Custom ping-based health checks
- ~200 lines of boilerplate code
**Built-in Cluster** ✅:
- Built-in `WorkerRegistry` + `ClusterClient`
- Multiple load balancing strategies (Round Robin, Random, Least Connections)
- Automatic discovery via SWIM gossip protocol
- Phi Accrual failure detection (accurate, adaptive)
- ~50 lines to set up
- **75% code reduction!**
## Running the Example
### Prerequisites
Ensure test certificates exist:
```bash
ls certs/test_cert.pem certs/test_key.pem
```
All commands should be run from the **project root directory**.
### Basic Setup
Open four terminals and run each component:
**Terminal 1 - Director:**
```bash
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin director
```
**Terminal 2 - Worker A:**
```bash
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
```
**Terminal 3 - Worker B:**
```bash
WORKER_LABEL=worker-b \
WORKER_ADDR=127.0.0.1:62002 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
```
**Terminal 4 - Client:**
```bash
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin client
```
### What You'll See
**Director Output:**
```
🎯 Starting Director at 127.0.0.1:61000
📁 Loading certificates from "../../certs/test_cert.pem"
✅ Director registered itself in cluster
✅ Cluster enabled - Director is now discoverable
🔄 Load balancing strategy: LeastConnections
📊 Worker pool status: 2 workers available
- worker-a at 127.0.0.1:62001 (0 connections)
- worker-b at 127.0.0.1:62002 (0 connections)
🚀 Director ready - listening on 127.0.0.1:61000
```
**Worker Output:**
```
👷 Starting Worker 'worker-a' at 127.0.0.1:62001
🔌 Binding server to 127.0.0.1:62001...
✅ Server bound successfully
🌐 Enabling cluster, connecting to director at 127.0.0.1:61000...
✅ Cluster enabled, connected to director
🏷️ Tagging worker with role=worker and label=worker-a...
✅ Worker 'worker-a' joined cluster with role=worker
🚀 Worker 'worker-a' is running and ready to handle requests
```
**Client Output:**
```
📡 Starting Client - connecting to director at 127.0.0.1:61000
✅ connected to director
🔀 director assigned worker - establishing direct connection
✅ direct connection established to worker
📤 creating request stream
🌊 stream opened successfully, starting to consume responses
📦 received token (sequence=1, text="token-1", total=1)
📦 received token (sequence=2, text="token-2", total=2)
...
```
## Testing Failure Scenarios
### Simulated Worker Failures
Enable periodic failures to test automatic failover:
**Worker with Failures:**
```bash
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
WORKER_FAILURE_ENABLED=true \ # Enable failure simulation
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
```
**Failure Cycle** (~18 seconds):
1. **Run**: 10 seconds of normal operation
2. **Warning**: "⚠️ Simulating worker failure in 3 seconds..."
3. **Failed**: 5 seconds in failed state - "💥 Worker failed!"
4. **Recovery**: "🔄 Worker recovering..."
5. **Ready**: "✅ Worker recovered and ready to serve!"
6. Repeat
**Client Behavior:**
- Detects failure via error response
- Returns to director for new worker assignment
- Switches to healthy worker seamlessly
- Streaming continues with minimal interruption
### Hard Kill Test
Test network-level failure detection:
```bash
# In a worker terminal, press Ctrl+C
```
**Observe:**
- Director detects failure via gossip protocol
- `WorkerRegistry` removes worker from pool
- Client requests automatically route to remaining workers
- Zero downtime for ongoing operations
### Worker Restart Test
After killing a worker, restart it to see re-discovery:
```bash
WORKER_LABEL=worker-a \
WORKER_ADDR=127.0.0.1:62001 \
DIRECTOR_ADDR=127.0.0.1:61000 \
RUST_LOG=info \
cargo run --manifest-path examples/cluster/Cargo.toml --bin worker
```
**Observe:**
- Worker automatically rejoins cluster
- Gossip spreads worker availability
- Director adds worker back to registry
- Client requests resume to all available workers
## How It Works
### 1. Automatic Discovery
Workers don't manually register - they just join the cluster:
```rust
// Worker code (simplified)
let cluster = ClusterMembership::new(config).await?;
cluster.join(vec![director_addr]).await?;
// Tag for discovery
cluster.set_tag("role", "worker");
cluster.set_tag("label", worker_label);
// That's it! Director discovers automatically via gossip
```
### 2. Load Balancing
Director uses `WorkerRegistry` for automatic load balancing:
```rust
// Director code
let registry = Arc::new(WorkerRegistry::new(
cluster,
LoadBalancingStrategy::LeastConnections
));
registry.start().await;
// Automatically tracks workers and balances load
```
### 3. Failure Detection
Phi Accrual algorithm provides accurate health monitoring:
- Adapts to network conditions
- Distinguishes slow nodes from failed nodes
- No false positives from temporary delays
- Automatic recovery when nodes return
### 4. Tag-Based Routing
Filter workers by capabilities:
```rust
// Get only GPU workers
let gpu_worker = registry.select_worker(Some("gpu=true")).await?;
// Get any worker
let any_worker = registry.select_worker(Some("role=worker")).await?;
```
## Key Cluster Features Demonstrated
### ✅ Automatic Discovery
No manual registration needed - gossip protocol handles everything
### ✅ Load Balancing
Choose from:
- **Round Robin**: Even distribution
- **Random**: Stateless workload distribution
- **Least Connections**: Balance based on current load (recommended)
### ✅ Failure Detection
Phi Accrual algorithm provides accurate, adaptive health monitoring
### ✅ Tag-Based Routing
Route by worker capabilities (GPU, CPU, zone, etc.)
### ✅ Event Monitoring
Subscribe to cluster events:
- `NodeJoined` - New worker available
- `NodeLeft` - Worker gracefully departed
- `NodeFailed` - Worker detected as failed
## Configuration Options
### Environment Variables
**Director:**
- `DIRECTOR_ADDR` - Bind address (default: `127.0.0.1:61000`)
- `RUST_LOG` - Log level (e.g., `info`, `debug`)
**Worker:**
- `WORKER_LABEL` - Worker identifier (default: `worker-1`)
- `WORKER_ADDR` - Bind address (default: `127.0.0.1:62001`)
- `DIRECTOR_ADDR` - Director address (default: `127.0.0.1:61000`)
- `WORKER_FAILURE_ENABLED` - Enable failure simulation (default: `false`)
- `RUST_LOG` - Log level
**Client:**
- `DIRECTOR_ADDR` - Director address (default: `127.0.0.1:61000`)
- `RUST_LOG` - Log level
### Load Balancing Strategies
```rust
use rpcnet::cluster::LoadBalancingStrategy;
// Options:
LoadBalancingStrategy::RoundRobin // Even distribution
LoadBalancingStrategy::Random // Random selection
LoadBalancingStrategy::LeastConnections // Pick least loaded (recommended)
```
### Cluster Configuration
```rust
use rpcnet::cluster::ClusterConfig;
let config = ClusterConfig::default()
.with_gossip_interval(Duration::from_secs(1))
.with_health_check_interval(Duration::from_secs(2));
```
## Troubleshooting
**Workers not discovered:**
- Ensure director starts first (it's the seed node)
- Check firewall allows UDP for gossip
- Verify workers connect to correct director address
**Requests failing:**
- Check worker has `role=worker` tag
- Verify compute handler is registered
- Check logs for connection errors
**Slow failover:**
- Adjust health check interval in config
- Tune Phi Accrual threshold
- Check network latency
## Production Considerations
For production deployments:
1. **TLS Certificates**: Use proper certificates, not test certs
2. **Monitoring**: Integrate cluster events with your monitoring system
3. **Scaling**: Add more workers dynamically as needed
4. **Persistence**: Consider persisting cluster state if needed
5. **Security**: Add authentication and authorization
6. **Network**: Plan for network partitions and split-brain scenarios
## Next Steps
- Try different load balancing strategies
- Add more workers dynamically
- Test network partition scenarios
- Add custom tags for routing (zone, GPU, etc.)
- Integrate with your application logic
For full source code, see `examples/cluster/` in the repository.