memberlist-plumtree
Plumtree (Epidemic Broadcast Trees) implementation built on top of memberlist for efficient O(n) message broadcast in distributed systems.
Overview
Plumtree combines the efficiency of tree-based broadcast with the reliability of gossip protocols through a hybrid push/lazy-push approach. This makes it ideal for applications requiring reliable message dissemination across large clusters with minimal network overhead.
Key Features
- O(n) Message Complexity: Messages traverse a spanning tree, eliminating redundant transmissions
- Self-Healing: Automatic tree repair when nodes fail or network partitions occur
- Protocol Optimization: Dynamic eager/lazy peer management for optimal broadcast paths
- Rate Limiting: Built-in per-peer rate limiting with token bucket algorithm
- Exponential Backoff: Configurable retry logic with exponential backoff for Graft requests
- Runtime Agnostic: Works with Tokio, async-std, or other async runtimes
- Zero-Copy: Efficient message handling using
bytes::Bytes - RTT-Based Topology: Peer scoring based on latency for optimal tree construction
- Connection Pooling: Efficient connection management with per-peer queues
- Adaptive Batching: Dynamic IHave batch sizing based on network conditions
- Dynamic Cleanup Tuning: Lock-free rate tracking with adaptive cleanup intervals
- Lazarus Seed Recovery: Automatic reconnection to restarted seed nodes
- Peer Persistence: Save known peers to disk for crash recovery
How Plumtree Works
Key Concept: Each Node Has Its Own View
Every node maintains its own classification of peers as "eager" or "lazy". There is no global tree - the spanning tree emerges from each node's local decisions.
┌─────────────────────────────────────────────────────────────────┐
│ Node A's Local View │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Node A] │
│ / | \ │
│ / | \ │
│ eager eager lazy │
│ / | \ │
│ [B] [C] [D] │
│ │
│ A sends GOSSIP (full message) to B and C │
│ A sends IHAVE (just message ID) to D │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Node B's Local View │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Node B] │
│ / | \ │
│ / | \ │
│ eager lazy eager │
│ / | \ │
│ [A] [C] [E] │
│ │
│ B has its OWN classification - different from A's view! │
│ B sends GOSSIP to A and E, IHAVE to C │
│ │
└─────────────────────────────────────────────────────────────────┘
Message Propagation Example
When Node A broadcasts a message, here's how it flows:
Step 1: A originates message
A ──GOSSIP──> B (A's eager peer)
A ──GOSSIP──> C (A's eager peer)
A ──IHAVE───> D (A's lazy peer)
Step 2: B receives, forwards to its eager peers
B ──GOSSIP──> E (B's eager peer)
B ──IHAVE───> C (B's lazy peer, but C already has it)
Step 3: Tree optimization
C received from both A and B (redundant!)
C sends PRUNE to B → B demotes C to lazy
Result: More efficient tree for next message
Lazy Peer Behavior: Wait, Don't Pull Immediately
When a lazy peer receives IHAVE, it does NOT immediately request the message. Instead, it waits to see if the message arrives through its eager connections:
┌─────────────────────────────────────────────────────────────────┐
│ What happens when D receives IHAVE from A │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. A ──IHAVE(msg_id)──> D │
│ │
│ 2. D starts a timer (graft_timeout, default 500ms) │
│ D thinks: "I'll wait to see if I get this from someone else"│
│ │
│ 3a. IF D receives GOSSIP from another peer before timeout: │
│ ✓ D cancels timer, does nothing │
│ (D got message through its eager connections - normal case)│
│ │
│ 3b. IF timeout expires and D still doesn't have message: │
│ ! D ──GRAFT(msg_id)──> A │
│ ! A ──GOSSIP(msg)──> D (and A promotes D to eager) │
│ (Tree repair - D joins A's eager set for future messages) │
│ │
└─────────────────────────────────────────────────────────────────┘
Example: D has its own eager peer (B)
[A] [B]
│ │
(lazy) (eager)
│ │
└──IHAVE──> [D] <──GOSSIP──┘
│
D receives from B first!
IHAVE from A is ignored (backup not needed)
Why wait instead of immediate pull?
| Approach | Messages | Latency | Used By |
|---|---|---|---|
| Immediate pull on IHAVE | 2x messages (IHAVE + GRAFT + GOSSIP) | Higher | - |
| Wait then pull if needed | Usually just GOSSIP via eager | Lower | Plumtree |
The IHAVE/GRAFT mechanism is a backup path, not the primary delivery. This is what makes Plumtree achieve O(n) message complexity - most messages flow through the eager spanning tree, and lazy links only activate when needed.
Protocol Messages
| Message | When Sent | Purpose |
|---|---|---|
| GOSSIP | To eager peers | Full message payload - primary delivery |
| IHAVE | To lazy peers | "I have message X" - backup announcement |
| GRAFT | When missing message | "Send me message X, add me as eager" |
| PRUNE | On duplicate receipt | "I got this already, demote me to lazy" |
Why This Works
- Eager Push: Full messages flow through spanning tree edges (eager peers)
- Lazy Push: IHave announcements provide redundant paths for reliability
- Graft: Lazy peers can request messages they missed → repairs tree
- Prune: Redundant paths are removed → tree converges to efficient structure
Installation
Add to your Cargo.toml:
[]
= "0.1"
Feature Flags
| Feature | Description |
|---|---|
tokio |
Enable Tokio runtime support (required for Lazarus background task) |
metrics |
Enable metrics collection via the metrics crate |
serde |
Enable serialization/deserialization for config |
Quick Start
use ;
use Bytes;
use Arc;
// Define a delegate to handle delivered messages
;
async
Configuration
Preset Configurations
// For LAN environments (low latency, high bandwidth)
let config = lan;
// For WAN environments (higher latency tolerance)
let config = wan;
// For large clusters (1000+ nodes)
let config = large_cluster;
Custom Configuration
use Duration;
let config = default
.with_eager_fanout // Number of eager peers
.with_lazy_fanout // Number of lazy peers for IHave
.with_ihave_interval
.with_graft_timeout
.with_message_cache_ttl
.with_message_cache_max_size
.with_optimization_threshold
.with_max_message_size
.with_graft_rate_limit_per_second
.with_graft_rate_limit_burst
.with_graft_max_retries;
Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
eager_fanout |
3 | Number of peers receiving full messages immediately |
lazy_fanout |
6 | Number of peers receiving IHave announcements |
ihave_interval |
100ms | Interval for batched IHave announcements |
message_cache_ttl |
60s | How long to cache messages for Graft requests |
message_cache_max_size |
10000 | Maximum number of cached messages |
optimization_threshold |
3 | Rounds before pruning redundant paths |
graft_timeout |
500ms | Timeout before sending Graft for missing message |
max_message_size |
64KB | Maximum message payload size |
graft_rate_limit_per_second |
10.0 | Rate limit for Graft requests per peer |
graft_rate_limit_burst |
20 | Burst capacity for Graft rate limiting |
graft_max_retries |
5 | Maximum Graft retry attempts with backoff |
Advanced Features
Peer Scoring (RTT-Based Topology)
Peer scoring enables latency-aware peer selection for optimal spanning tree construction:
use ;
// Peers are scored based on RTT and reliability
let scoring = new;
// Record RTT measurement
scoring.record_rtt;
// Record failures (affects peer ranking)
scoring.record_failure;
// Get best peers for eager set (sorted by score)
let best_peers = scoring.best_peers;
Connection Pooling
Efficient connection management with per-peer message queues:
use ;
let config = default
.with_queue_size // Per-peer queue capacity
.with_batch_size // Messages per batch
.with_flush_interval;
let transport = new;
// Messages are automatically batched and sent efficiently
transport.send.await?;
// Get pool statistics
let stats = transport.stats;
println!;
Adaptive IHave Batching
Dynamic batch size adjustment based on network conditions:
use ;
let batcher = new;
// Record network feedback
batcher.record_message;
batcher.record_graft_received; // Successful graft
batcher.record_graft_timeout; // Failed graft
// Get recommended batch size (adjusts based on latency/success rate)
let batch_size = batcher.recommended_batch_size;
// Scale for cluster size
batcher.set_cluster_size_hint;
// Get statistics
let stats = batcher.stats;
println!;
Dynamic Cleanup Tuning
Lock-free rate tracking with adaptive cleanup intervals:
use ;
let tuner = new;
// Record messages (fully lock-free, uses atomics only)
tuner.record_message;
tuner.record_messages; // Batch recording for high throughput
// Get tuned cleanup parameters based on current state
let params = tuner.get_parameters;
println!;
// Check backpressure hint
match params.backpressure_hint
// Get statistics with trend analysis
let stats = tuner.stats;
println!;
Sharded Seen Map
High-performance deduplication with configurable sharding:
// Seen map is automatically managed by Plumtree
// Configure capacity limits in PlumtreeConfig
let config = default
.with_seen_map_soft_cap // Soft limit for entries
.with_seen_map_hard_cap; // Hard limit (emergency eviction)
// Get seen map statistics
let stats = plumtree.seen_map_stats;
println!;
Metrics
When compiled with the metrics feature, comprehensive metrics are exposed:
Counters
plumtree_messages_broadcast_total- Total broadcasts initiatedplumtree_messages_delivered_total- Total messages deliveredplumtree_messages_duplicate_total- Duplicate messages receivedplumtree_gossip_sent_total- Gossip messages sentplumtree_ihave_sent_total- IHave messages sentplumtree_graft_sent_total- Graft messages sentplumtree_prune_sent_total- Prune messages sentplumtree_graft_success_total- Successful Graft requestsplumtree_graft_failed_total- Failed Graft requests
Histograms
plumtree_graft_latency_seconds- Graft request to delivery latencyplumtree_message_size_bytes- Message payload size distributionplumtree_message_hops- Number of hops messages travelplumtree_ihave_batch_size- IHave batch size distribution
Gauges
plumtree_eager_peers- Current eager peer countplumtree_lazy_peers- Current lazy peer countplumtree_cache_size- Messages in cacheplumtree_pending_grafts- Pending Graft requests
Delegate Callbacks
Implement PlumtreeDelegate to receive protocol events:
Plumtree vs PlumtreeMemberlist
This crate provides two main APIs:
Plumtree - Core Protocol
The Plumtree struct implements the core Epidemic Broadcast Tree protocol. Use this when:
- You need fine-grained control over message handling
- You want to integrate with a custom transport layer
- You're building a custom networking stack
// Low-level API - you handle message transport
let = new;
plumtree.handle_message; // Manual message handling
PlumtreeMemberlist - Simplified Integration
The PlumtreeMemberlist struct wraps Plumtree with a simpler API designed for integration with memberlist. Use this when:
- You want a simpler broadcast API without manual message handling
- You're using memberlist for cluster membership and peer discovery
- You want automatic peer management based on memberlist events
use ;
// High-level API - simpler to use
let plumtree_ml = new;
// Add peers discovered via memberlist
plumtree_ml.add_peer;
// Broadcast - message reaches all nodes via spanning tree
let msg_id = plumtree_ml.broadcast.await?;
// Access statistics
let peer_stats = plumtree_ml.peer_stats;
let cache_stats = plumtree_ml.cache_stats;
Key difference: PlumtreeMemberlist is a convenience wrapper that provides:
- Simplified broadcast API (just call
broadcast()) - Built-in statistics (peer counts, cache stats)
- Easy peer management (
add_peer,remove_peer) - Graceful shutdown handling
MemberlistStack - Full Integration (Recommended)
The MemberlistStack struct provides the complete integration stack, combining Plumtree with a real Memberlist instance for automatic SWIM-based peer discovery. This is the recommended entry point for production deployments:
use ;
use ;
use SocketAddr;
// Create your transport (e.g., NetTransport for real networking)
let transport = new.await?;
// Create the stack with PlumtreeNodeDelegate wrapping your delegate
let delegate = new;
// Build the complete stack
let stack = new;
// Join the cluster - just pass seed addresses!
let seeds: = vec!;
stack.join.await?;
// Broadcast messages - automatically routes via spanning tree
let msg_id = stack.broadcast.await?;
// Get cluster status
println!;
println!;
// Graceful shutdown
stack.leave.await?;
stack.shutdown.await?;
Key features of MemberlistStack:
- Automatic peer discovery: SWIM protocol discovers peers without manual
add_peer()calls - Simplified join API: Just pass seed addresses, no need to deal with
MaybeResolvedAddress - Integrated lifecycle: Single struct manages both Memberlist and Plumtree
- Peer sync:
PlumtreeNodeDelegateautomatically syncs Plumtree peers when Memberlist membership changes - Lazarus seed recovery: Automatic reconnection to restarted seed nodes (see below)
- Peer persistence: Save known peers to disk for crash recovery
Lazarus Seed Recovery
The "Lazarus" feature solves the Ghost Seed Problem: when a seed node fails and restarts, other nodes have already marked it as dead and stopped probing it. Without intervention, the restarted seed remains isolated.
How it works:
- Configure static seed addresses in
BridgeConfig - Enable the Lazarus background task
- The task periodically checks if seeds are in the alive set
- Missing seeds are automatically probed and rejoined
use ;
use Duration;
use PathBuf;
// Configure Lazarus seed recovery
let config = new
// Static seeds to monitor
.with_static_seeds
// Enable the Lazarus background task
.with_lazarus_enabled
// Probe interval (default: 30 seconds)
.with_lazarus_interval
// Optional: persist known peers for crash recovery
.with_persistence_path;
// After creating MemberlistStack, spawn the Lazarus task
let lazarus_handle = stack.spawn_lazarus_task;
// Monitor Lazarus statistics
let stats = lazarus_handle.stats;
println!;
println!;
println!;
// Gracefully shutdown when done
lazarus_handle.shutdown;
BridgeConfig Parameters:
| Parameter | Default | Description |
|---|---|---|
static_seeds |
[] |
List of seed node addresses to monitor |
lazarus_enabled |
false |
Enable the Lazarus background task |
lazarus_interval |
30s | Interval between seed probes |
persistence_path |
None |
Path to persist known peers |
log_changes |
true |
Log topology changes |
auto_promote |
true |
Auto-promote new peers based on fanout |
Peer Persistence
For crash recovery, MemberlistStack can save known peer addresses to disk. Combined with static seeds, this provides robust bootstrap options:
use ;
// Save current peers to file (call periodically or on shutdown)
stack.save_peers_to_file.await?;
// On startup, load bootstrap addresses from both sources
let config = new
.with_static_seeds
.with_persistence_path;
// Combines static seeds + persisted peers (deduplicated)
let bootstrap_addrs = load_bootstrap_addresses;
// Use bootstrap addresses to join the cluster
stack.join.await?;
Persistence File Format:
# Comments are supported
192.168.1.100:7946
192.168.1.101:7946
192.168.1.102:7946
Important: Each node should use a unique persistence path to avoid conflicts:
let node_id = "node-42";
let path = from;
When to use each API:
| API | Use Case |
|---|---|
Plumtree |
Custom transport, fine-grained control |
PlumtreeMemberlist |
Manual peer management, testing |
MemberlistStack |
Production - full SWIM + Plumtree integration |
Examples
Terminal Chat Example
An interactive terminal-based chat with fault injection for testing:
Features:
- 20 simulated nodes with full mesh topology
- Real-time protocol metrics (Grafts, Prunes, Promotions, Demotions)
- Fault injection controls:
F- Toggle node offline/onlineL- Toggle 20% packet lossE- Promote all peers to eager (triggers prunes)R- Reset metricsM- Toggle metrics panel
- User switching: Tab, Shift+Tab, number keys (1-9, 0=10), PageUp/PageDown
- Color-coded event log showing protocol activity
Web Chat Example
A web-based visualization with peer tree display:
# Then open http://localhost:3000
Features:
- Real-time WebSocket updates
- Visual peer tree showing eager (green) and lazy (orange) connections
- Interactive controls: user switching, fault injection, metrics reset
- Three-panel layout: Tree visualization | Chat messages | Event log + Metrics
- Static HTML/JS frontend served from
examples/static/
Web UI Layout:
┌─────────────────┬─────────────────────┬─────────────────┐
│ Peer Tree │ Chat Messages │ Event Log │
│ │ │ │
│ [U3] │ [12:34:56] U1: Hi │ [00:15] GOSSIP │
│ / \ │ [12:34:57] U2: Hey │ [00:16] PRUNE │
│ [U2] [U4] │ │ [00:17] PROMOTE │
│ (eager) (lazy) │ > Type message... │ │
│ │ │ ┌─────────────┐ │
│ Legend: │ │ │ Metrics │ │
│ ● Current │ │ │ Sent: 5 │ │
│ ● Eager │ │ │ Recv: 12 │ │
│ ● Lazy │ │ │ Grafts: 2 │ │
└─────────────────┴─────────────────────┴─────────────────┘
Pub/Sub Example
A topic-based publish/subscribe system using PlumtreeMemberlist:
This demonstrates:
- Using
PlumtreeMemberlistfor simplified broadcasting - Topic-based message routing
- Dynamic subscriptions (subscribe/unsubscribe at runtime)
- Message serialization/deserialization
- Statistics and monitoring
- Graceful peer removal and shutdown
Important: The pub/sub example uses application-layer filtering, not per-topic routing. This means:
┌─────────────────────────────────────────────────────────────────┐
│ How Pub/Sub Works │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Publisher ──broadcast──> ALL nodes via spanning tree │
│ │ │
│ ┌─────────┼─────────┐ │
│ ▼ ▼ ▼ │
│ Node A Node B Node C │
│ │ │ │ │
│ [subscribed] [not sub] [subscribed] │
│ │ │ │ │
│ PROCESS DISCARD PROCESS │
│ │
└─────────────────────────────────────────────────────────────────┘
- Messages are broadcast to all nodes via Plumtree's spanning tree
- Each node receives every message regardless of topic
- Topic filtering happens in the
on_delivercallback (application layer) - This is simple but means all nodes see all traffic
For true per-topic routing (separate spanning tree per topic), you would need to:
- Create multiple
PlumtreeMemberlistinstances (one per topic), or - Implement topic-aware routing at the protocol level
The application-layer approach is suitable for:
- Small to medium clusters
- Scenarios where most nodes subscribe to most topics
- Simplicity over bandwidth optimization
Performance Characteristics
| Aspect | Complexity |
|---|---|
| Message broadcast | O(n) messages |
| Tree repair | O(1) per missing message |
| Memory per node | O(cache_size) |
| Latency | O(log n) hops typical |
Optimized for Scale
This implementation includes several optimizations for large-scale deployments (10,000+ nodes):
- Lock-free message rate tracking: Atomic counters for high-throughput recording
- Sharded deduplication: Reduces lock contention with configurable sharding
- RTT-based peer selection: Optimizes tree topology for latency
- Adaptive batching: Adjusts batch sizes based on network conditions
- Connection pooling: Efficient message delivery with per-peer queues
Roadmap
Phase 3: Protocol Extensions (Planned)
- Priority-based message routing: Support for message priorities affecting delivery order
- Topic-based spanning trees: Separate spanning trees per topic for efficient pub/sub
- Persistence layer: Optional WAL for message durability across restarts
- Compression: Optional message compression for bandwidth reduction
- Encryption: End-to-end encryption support for secure clusters
- Protocol versioning: Backward-compatible protocol evolution
- Cluster-aware batching: Batch size hints based on cluster topology
- Health-based peer selection: Factor in peer health metrics for routing decisions
References
- Epidemic Broadcast Trees - Original Plumtree paper by Leitão, Pereira, and Rodrigues (2007)
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.