# photon-etcd-cluster
[](https://crates.io/crates/photon-etcd-cluster)
[](https://docs.rs/photon-etcd-cluster)
[](https://opensource.org/licenses/MIT)
A lightweight Rust library for cluster coordination using etcd. Provides leader election and node registry with minimal dependencies and no platform lock-in (unlike Kubernetes-native solutions).
## Why photon-etcd-cluster?
- **Platform-agnostic**: Works anywhere etcd runs - bare metal, VMs, containers, or cloud
- **Minimal dependencies**: Only etcd required, no Kubernetes or other orchestrators
- **Reactive API**: Event-driven with broadcast channels and watch streams - no polling required
- **Lock-free reads**: O(1) access to cluster state via `watch::Receiver::borrow()`
- **Node Metrics**: Optional system metrics collection (CPU, memory, load)
## Use Cases
- Organize distributed workers into logical groups
- Elect a single leader per group for coordination tasks (cache invalidation, job scheduling)
- Dynamic service discovery for load balancers
- Health monitoring with automatic failure detection
- **Weighted load balancing** based on real-time node metrics (CPU, memory, queue depth)
## Quick Start
Add to your `Cargo.toml`:
```toml
[dependencies]
photon-etcd-cluster = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
```
For system metrics collection (CPU, memory, load average), enable the `system-metrics` feature:
```toml
[dependencies]
photon-etcd-cluster = { version = "0.1", features = ["system-metrics"] }
```
### Worker Process (ClusterNode)
```rust
use photon_etcd_cluster::ClusterNode;
use photon_etcd_cluster::HealthStatus;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel(1);
let node = ClusterNode::new(
vec!["http://localhost:2379".to_string()],
"node-1".to_string(),
"192.168.1.10".parse()?,
"my-service".to_string(),
Some(5), // TTL in seconds
);
// Run node in background
let n = node.clone();
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
n.run(&mut shutdown_rx).await
});
// Use in your application
if node.is_leader() {
println!("I am the leader!");
// Perform leader-only tasks
}
match node.current_health() {
HealthStatus::Healthy => println!("Connected to etcd"),
HealthStatus::Unhealthy => println!("Connection issues"),
HealthStatus::Unknown => println!("Initializing..."),
}
Ok(())
}
```
### Load Balancer / Service Discovery
```rust
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = ServiceDiscovery::new(
vec!["http://localhost:2379".to_string()],
"my-service".to_string(),
);
// Run discovery in background
let d = discovery.clone();
tokio::spawn(async move { d.run(None).await });
// Wait for initial sync (event-driven, no polling)
discovery.wait_ready().await;
// Query nodes (lock-free, O(1))
for node in discovery.nodes().iter() {
println!("Node {} at {}", node.id, node.ip);
}
// Get current leader
if let Some(leader) = discovery.leader() {
println!("Current leader: {}", leader.id);
}
Ok(())
}
```
### Event-Driven Updates (Recommended)
React to cluster changes as they happen using broadcast events:
```rust
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = ServiceDiscovery::new(
vec!["http://localhost:2379".to_string()],
"my-service".to_string(),
);
// Subscribe to events BEFORE running
let mut events = discovery.subscribe();
// Run discovery in background
let d = discovery.clone();
tokio::spawn(async move { d.run(None).await });
// React to cluster changes
loop {
match events.recv().await {
Ok(ClusterEvent::Ready) => {
println!("Initial sync complete");
}
Ok(ClusterEvent::NodeJoined(n)) => {
println!("Node joined: {} at {}", n.id, n.ip);
// Update load balancer backends
}
Ok(ClusterEvent::NodeLeft(n)) => {
println!("Node left: {}", n.id);
// Remove from backend pool
}
Ok(ClusterEvent::LeaderElected(n)) => {
println!("New leader: {}", n.id);
// Route writes to new leader
}
Ok(ClusterEvent::LeaderLost) => {
println!("Leader lost, awaiting election");
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
println!("Missed {} events, refreshing state", n);
// Re-sync from discovery.nodes()
}
Err(_) => break,
_ => {}
}
}
Ok(())
}
```
### Watch-Based Metrics (Efficient State Observation)
Use watch channels for metrics or state observation - more efficient than polling:
```rust
use photon_etcd_cluster::ServiceDiscovery;
async fn update_metrics(discovery: ServiceDiscovery) {
let mut watch = discovery.watch_nodes();
// Set initial value
let count = watch.borrow().len();
NODE_GAUGE.set(count as i64);
// React only when state changes (no polling!)
while watch.changed().await.is_ok() {
let count = watch.borrow().len();
NODE_GAUGE.set(count as i64);
}
}
```
### Node Metrics for Weighted Load Balancing
Nodes can report system metrics (CPU, memory, load average) that load balancers can use for weighted traffic distribution. Enable the `system-metrics` feature and use `ClusterNodeBuilder`:
```rust
use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel(1);
// Create node with system metrics collection
let node = ClusterNodeBuilder::new(
vec!["http://localhost:2379".to_string()],
"worker-1".to_string(),
"192.168.1.10".parse()?,
"workers".to_string(),
)
.ttl(5)
.metrics_collector(SystemMetricsCollector::new())
.metrics_update_interval(5) // Update metrics every 5 seconds
.build();
// Run node in background
let n = node.clone();
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
n.run(&mut shutdown_rx).await
});
Ok(())
}
```
#### Custom Metrics Collector
Implement `MetricsCollector` trait for application-specific metrics:
```rust
use photon_etcd_cluster::{ClusterNodeBuilder, MetricsCollector, NodeMetadata};
use serde_json::json;
struct AppMetricsCollector {
// Your app state
}
impl MetricsCollector for AppMetricsCollector {
fn collect(&self) -> NodeMetadata {
json!({
"cpu_usage_percent": 45.0,
"memory_usage_percent": 60.0,
"queue_depth": 150,
"active_connections": 42,
"requests_per_second": 1000.0
})
}
}
let node = ClusterNodeBuilder::new(endpoints, id, ip, group)
.metrics_collector(AppMetricsCollector { /* ... */ })
.metrics_update_interval(5)
.build();
```
#### Reading Node Metrics (Load Balancer Side)
```rust
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent, metadata_keys};
let discovery = ServiceDiscovery::new(endpoints, "workers".into());
// React to metric changes
let mut events = discovery.subscribe();
while let Ok(event) = events.recv().await {
match event {
ClusterEvent::NodeUpdated { old, new } => {
// Metadata changed - update backend weights
if let Some(cpu) = new.metadata.get(metadata_keys::CPU_USAGE_PERCENT) {
println!("Node {} CPU: {}%", new.id, cpu);
}
}
_ => {}
}
}
// Or query directly
for node in discovery.nodes().iter() {
let cpu = node.metadata.get("cpu_usage_percent")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let mem = node.metadata.get("memory_usage_percent")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
println!("Node {}: CPU={:.1}%, Memory={:.1}%", node.id, cpu, mem);
}
```
#### Standard Metadata Keys
The `metadata_keys` module provides standard key names:
| `cpu_usage_percent` | `f64` | CPU usage (0-100%) |
| `memory_usage_percent` | `f64` | Memory usage (0-100%) |
| `memory_available_bytes` | `u64` | Available memory in bytes |
| `load_avg_1m` | `f64` | 1-minute load average |
| `active_connections` | `u32` | Active connection count |
| `requests_per_second` | `f64` | Request throughput |
| `queue_depth` | `u32` | Pending work queue size |
### Real-World Example: HTTP Load Balancer with Dynamic Backends
This example shows how to build a load balancer that automatically discovers backend servers using `ServiceDiscovery`. Based on actual production usage with [Pingora](https://github.com/cloudflare/pingora).
```rust
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
use std::net::SocketAddr;
/// Manages dynamic backend discovery for a load balancer
struct BackendDiscovery {
discovery: ServiceDiscovery,
}
impl BackendDiscovery {
fn new(discovery: ServiceDiscovery) -> Self {
Self { discovery }
}
/// Returns healthy backend addresses for load balancing
/// Called on every incoming request - must be fast!
fn get_backends(&self) -> Vec<SocketAddr> {
// Lock-free O(1) read - safe for high-frequency calls
self.discovery
.nodes()
.iter()
.map(|node| SocketAddr::new(node.ip, 8080))
.collect()
}
/// Optionally route leader-only requests (e.g., write operations)
fn get_leader_backend(&self) -> Option<SocketAddr> {
self.discovery
.leader()
.map(|leader| SocketAddr::new(leader.ip, 8080))
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = tokio::runtime::Runtime::new()?;
// Create discovery for different service groups
let etcd_endpoints = vec!["http://localhost:2379".to_string()];
let workers_discovery = ServiceDiscovery::new(
etcd_endpoints.clone(),
"workers".to_string(), // Group: backend workers
);
let cache_discovery = ServiceDiscovery::new(
etcd_endpoints.clone(),
"cache-nodes".to_string(), // Group: cache servers
);
// Spawn discovery background tasks
rt.block_on(async {
let w = workers_discovery.clone();
let c = cache_discovery.clone();
tokio::spawn(async move { w.run(None).await });
tokio::spawn(async move { c.run(None).await });
// Wait for at least 1 backend (built-in, no polling!)
workers_discovery.wait_for_nodes(1).await;
cache_discovery.wait_for_nodes(1).await;
});
// Create backend discovery instances for load balancer
let worker_backends = BackendDiscovery::new(workers_discovery.clone());
let cache_backends = BackendDiscovery::new(cache_discovery.clone());
// Use in your load balancer's request routing
println!("Worker backends: {:?}", worker_backends.get_backends());
println!("Cache backends: {:?}", cache_backends.get_backends());
if let Some(leader) = worker_backends.get_leader_backend() {
println!("Leader backend for writes: {}", leader);
}
Ok(())
}
```
Key patterns demonstrated:
- **Multiple service groups**: Separate discovery instances for different backend types (workers, cache, etc.)
- **Lock-free reads**: `discovery.nodes()` is safe to call on every request
- **Leader routing**: Route write operations to the elected leader
- **Built-in wait helpers**: `wait_for_nodes()` uses watch channels internally - no polling
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ etcd Cluster │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ registry/{group} │ │ election/{group} │ │
│ │ /node-1 │ │ (leader key) │ │
│ │ /node-2 │ │ │ │
│ │ /node-N │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ ▲
│ watch │ campaign/proclaim
│ │
┌──────────┴──────────┐ ┌───────┴────────┐
│ ServiceDiscovery │ │ ClusterNode │
│ - subscribe() │ │ - run() │
│ - watch_nodes() │ │ - is_leader() │
│ - nodes() │ │ - health │
│ - leader() │ │ │
└─────────────────────┘ └────────────────┘
```
### Components
| **ClusterNode** | Node registration, leader election, health management | Worker processes |
| **ClusterNodeBuilder** | Fluent builder for ClusterNode with metrics configuration | Worker processes |
| **ServiceDiscovery** | Reactive cluster state via events and watch channels | Load balancers |
| **ClusterEvent** | Enum for cluster state changes (join/leave/leader/updated) | Event subscribers |
| **Node** | Serializable node data (id, ip, last_seen, metadata) | Both |
| **MetricsCollector** | Trait for collecting node metrics | Worker processes |
| **SystemMetricsCollector** | Built-in collector for CPU, memory, load (requires `system-metrics` feature) | Worker processes |
## Features
### Leader Election
Uses etcd's native `campaign`/`proclaim` APIs for distributed consensus:
- **Campaign**: Blocks until leadership is acquired
- **Proclaim**: Periodic heartbeat to maintain leadership
- **Resign**: Graceful leadership handover on shutdown
### Health Monitoring
Heartbeat-based health tracking:
- Lease keep-alive failures mark node as unhealthy after 3 consecutive failures
- Node triggers reconnection after 10 consecutive failures
- Automatic recovery when connectivity restores
### Resilient Connectivity
- Exponential backoff on etcd connection failures (1s → 30s max)
- Graceful reconnection after network partitions
- Shutdown signal integration for clean termination
### Node Metrics (Optional)
Nodes can publish arbitrary JSON metadata (CPU, memory, custom metrics) for load balancer consumption:
- **Schema-less**: Any JSON-serializable data via `serde_json::Value`
- **Pluggable collection**: Implement `MetricsCollector` trait for custom metrics
- **Built-in system metrics**: `SystemMetricsCollector` provides CPU, memory, load average (requires `system-metrics` feature)
- **Separate update task**: Metrics updates run independently from lease keep-alive
- **Change detection**: `NodeUpdated` events emitted when metadata changes
### Reactive API
ServiceDiscovery provides three ways to observe cluster state:
- **Event subscription** via `subscribe()`: Push-based `ClusterEvent` notifications
- **Watch channels** via `watch_nodes()`/`watch_leader()`: Efficient state observation
- **Direct access** via `nodes()`/`leader()`: O(1) lock-free reads
Events emitted:
- `NodeJoined(Node)` / `NodeLeft(Node)` / `NodeUpdated { old, new }`
- `LeaderElected(Node)` / `LeaderLost`
- `Ready` / `Disconnected` / `Reconnected`
## etcd Key Structure
```
registry/
└── {group_name}/
├── node-1 → {"id":"node-1","ip":"192.168.1.10","last_seen":1234567890,"metadata":{...}}
├── node-2 → {"id":"node-2","ip":"192.168.1.11","last_seen":1234567891,"metadata":{...}}
└── ...
election/
└── {group_name} → (etcd election key, value = current leader ID)
```
Node metadata example (with `SystemMetricsCollector`):
```json
{
"id": "worker-1",
"ip": "192.168.1.10",
"last_seen": 1234567890,
"metadata": {
"cpu_usage_percent": 45.2,
"memory_usage_percent": 62.8,
"memory_available_bytes": 8589934592,
"load_avg_1m": 2.5
}
}
```
## Configuration
### ClusterNode / ClusterNodeBuilder
| `etcd_endpoints` | `Vec<String>` | Required | etcd cluster endpoints |
| `node_id` | `String` | Required | Unique node identifier |
| `node_ip` | `IpAddr` | Required | Node's IP address |
| `group_name` | `String` | Required | Logical group for nodes |
| `ttl` | `i64` | `5` | Lease TTL in seconds |
| `metrics_collector` | `impl MetricsCollector` | `NoopMetricsCollector` | Metrics collection implementation |
| `metrics_update_interval` | `u64` | `0` (disabled) | Seconds between metrics updates |
## Performance
- **ServiceDiscovery.nodes()**: O(1) lock-free read via `watch::Receiver::borrow()`
- **Throughput**: 10M accesses in <2s (benchmark validated)
- **Memory**: ~5 MiB for 1000 nodes
- **Event channel**: 256-message buffer for broadcast subscribers
## Requirements
- Rust 1.85+ (edition 2024)
- etcd v3.5+
- Docker (for integration tests only)
Tested with etcd v3.5.21.
## Build System
```bash
# Cargo
cargo build
cargo test
## Testing
```bash
# Unit tests
cargo test
# Integration tests (requires Docker - spawns etcd container)
cargo test -- --test-threads=1
```
### Integration Test Coverage
- Single node self-election
- Leader re-election on failure
- Node reconnection after etcd restarts
- Scalability with 20+ concurrent nodes
- Node metadata storage and retrieval
- Metadata update propagation via `NodeUpdated` events
## Comparison with Alternatives
| Language | Rust | Rust | Go | Rust |
| Backend | etcd | Kubernetes | None (P2P) | None (P2P) |
| Leader Election | Yes | Yes | No | No |
| Node Registry | Yes | No | Yes | Yes |
| Node Metrics | Yes | No | Limited | Limited |
| Platform Independent | Yes | No (K8s only) | Yes | Yes |
| External Dependencies | etcd | Kubernetes | None | None |
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Author
Roman Gushel
## Roadmap
- [ ] Leader Priority/Weighting
- [ ] Graceful Leadership Transfer
- [ ] Node Tagging & Filtering
- [ ] TLS/mTLS Support for etcd Connections
- [ ] etcd Authentication Support
- [ ] Graceful Degradation & Health Check States
- [ ] Application-Level Health Checks
- [ ] Topology/Zone Awareness
- [ ] Observability: Prometheus Metrics
- [ ] Circuit Breaker for etcd Operations