Redisson
๐ฏ Features
- ๐ Distributed Locks: Reentrant locks, fair locks, read-write locks, and RedLock algorithm with automatic renewal
- ๐ Rich Data Structures: Distributed maps, lists, sets, sorted sets, buckets, and streams
- โก Dual Runtime: Full support for both synchronous and asynchronous operations
- ๐ Synchronization Primitives: Semaphores, rate limiters, countdown latches, and atomic counters
- ๐ High Performance: Connection pooling, command pipelining, and batch operations
- ๐พ Local Cache Integration: Read-through/write-through caching with local cache
- ๐ง Comprehensive Configuration: Flexible configuration for various Redis deployment modes
- ๐ฏ Type Safety: Full Rust type system support with compile-time checking
- ๐ก๏ธ Production Ready: Automatic reconnection, timeout handling, and comprehensive error management
- ๐ก Advanced Features: Redis Stream support, delayed queues, and publish/subscribe messaging
๐ฆ Installation
Add this to your Cargo.toml:
Basic Installation
[dependencies]
redisson = "0.1"
With Async Support (requires Tokio)
[dependencies]
redisson = { version = "0.1", features = ["async"] }
tokio = { version = "1", features = ["full"] }
With Additional Features
[dependencies]
redisson = { version = "0.1", features = ["async", "caching"] }
๐ Quick Start
1. Basic Synchronous Usage
use redisson::{RedissonClient, RedissonConfig};
use std::time::Duration;
fn main() -> redisson::RedissonResult<()> {
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(10)
.with_connection_timeout(Duration::from_secs(5));
let client = RedissonClient::new(config)?;
let lock = client.get_lock("my-resource");
lock.lock()?;
println!("Critical section accessed");
lock.unlock()?;
let bucket = client.get_bucket::<String>("my-bucket");
bucket.set(&"Hello World".to_string())?;
let value: Option<String> = bucket.get()?;
println!("Bucket value: {:?}", value);
let map = client.get_map::<String, i32>("my-map");
map.put(&"key1".to_string(), &42)?;
client.shutdown()?;
Ok(())
}
2. Asynchronous Usage
use redisson::{AsyncRedissonClient, RedissonConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> redisson::RedissonResult<()> {
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(10);
let client = AsyncRedissonClient::new(config).await?;
let lock = client.get_lock("async-resource");
lock.lock().await?;
println!("Async critical section accessed");
lock.unlock().await?;
let bucket = client.get_bucket::<String>("async-data");
bucket.set(&"Async Value".to_string()).await?;
let value = bucket.get().await?;
println!("Async value: {:?}", value);
client.shutdown().await?;
Ok(())
}
3. Distributed Lock with Watchdog
use std::time::Duration;
fn lock_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
let lock = client.get_lock("database-update");
if lock.try_lock_with_timeout(Duration::from_secs(5))? {
println!("Lock acquired, performing critical operations...");
std::thread::sleep(Duration::from_secs(30));
lock.unlock()?;
println!("Lock released");
} else {
println!("Failed to acquire lock within timeout");
}
Ok(())
}
4. Redis Stream Support
use std::collections::HashMap;
fn stream_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
let stream = client.get_stream::<String>("orders:stream");
stream.create_group("order-processors", "0")?;
let mut fields = HashMap::new();
fields.insert("order".to_string(), "Order #1234".to_string());
let message_id = stream.add_auto_id(&fields)?;
println!("Message added with ID: {}", message_id);
let messages = stream.read_group("order-processors", "consumer-1", Some(10), None, false)?;
for message in messages {
println!("Received message: {:?}", message);
}
Ok(())
}
5. Local Cache Integration
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct UserSession {
user_id: String,
session_token: String,
last_activity: u64,
}
fn cache_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
let user_cache = client.get_cache::<String, UserSession>("user_sessions");
let session = UserSession {
user_id: "user123".to_string(),
session_token: "abc123def456".to_string(),
last_activity: 1234567890,
};
user_cache.set("user123".to_string(), session.clone())?;
let cached = user_cache.get(&"user123".to_string())?;
println!("First read: {:?}", cached);
let cached_again = user_cache.get(&"user123".to_string())?;
println!("Second read: {:?}", cached_again);
let cache_stats = user_cache.get_local_cache().get_stats();
println!("Cache hits: {}", cache_stats.total_hits);
Ok(())
}
๐ Complete Example: Order Processing System
use redisson::{RedissonClient, RedissonConfig};
use std::time::Duration;
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
customer_id: String,
amount: f64,
status: OrderStatus,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
enum OrderStatus {
Pending,
Processing,
Completed,
Failed,
}
fn main() -> redisson::RedissonResult<()> {
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(20)
.with_connection_timeout(Duration::from_secs(5));
let client = RedissonClient::new(config)?;
process_order_with_lock(&client)?;
publish_order_events(&client)?;
cache_order_data(&client)?;
rate_limit_api_calls(&client)?;
client.shutdown()?;
Ok(())
}
fn process_order_with_lock(client: &RedissonClient) -> RedissonResult<()> {
let lock = client.get_lock("order:process:123");
if lock.try_lock_with_timeout(Duration::from_secs(10))? {
println!("Processing order 123...");
std::thread::sleep(Duration::from_secs(2));
lock.unlock()?;
println!("Order processing completed");
}
Ok(())
}
fn publish_order_events(client: &RedissonClient) -> RedissonResult<()> {
let order_stream = client.get_stream::<Order>("orders:events");
let order = Order {
id: "ORD-001".to_string(),
customer_id: "CUST-001".to_string(),
amount: 299.99,
status: OrderStatus::Processing,
};
let mut fields = HashMap::new();
fields.insert("order".to_string(), order);
let message_id = order_stream.add_auto_id(&fields)?;
println!("Order event published with ID: {}", message_id);
Ok(())
}
fn cache_order_data(client: &RedissonClient) -> RedissonResult<()> {
let order_cache = client.get_cache::<String, Order>("orders:cache");
let order = Order {
id: "ORD-001".to_string(),
customer_id: "CUST-001".to_string(),
amount: 299.99,
status: OrderStatus::Pending,
};
order_cache.set("ORD-001".to_string(), order)?;
let cached_order = order_cache.get(&"ORD-001".to_string())?;
println!("Cached order: {:?}", cached_order);
Ok(())
}
fn rate_limit_api_calls(client: &RedissonClient) -> RedissonResult<()> {
let rate_limiter = client.get_rate_limiter("api:orders", 10.0, 20.0);
for i in 1..=15 {
if rate_limiter.try_acquire(1.0)? {
println!("API call {}: Allowed", i);
} else {
println!("API call {}: Rate limited", i);
}
std::thread::sleep(Duration::from_millis(50));
}
Ok(())
}
๐ง Configuration
Basic Configuration
use redisson::RedissonConfig;
use std::time::Duration;
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(20) .with_connection_timeout(Duration::from_secs(5))
.with_response_timeout(Duration::from_secs(3))
.with_lock_expire_time(Duration::from_secs(30))
.with_watchdog_timeout(Duration::from_secs(10))
.with_retry_count(3)
.with_drift_factor(0.01) .with_backup_pool_count(2);
Redis Cluster Mode
let config = RedissonConfig::cluster(vec![
"redis://127.0.0.1:7000",
"redis://127.0.0.1:7001",
"redis://127.0.0.1:7002",
])
.with_cluster_scan_interval(Duration::from_secs(5))
.with_pool_size(10);
Redis Sentinel Mode
let config = RedissonConfig::sentinel(vec![
"redis://127.0.0.1:26379",
"redis://127.0.0.1:26380",
"redis://127.0.0.1:26381",
], "mymaster")
.with_sentinel_password("password")
.with_database(0);
๐ Supported Data Structures
| Data Structure |
Sync Support |
Async Support |
Description |
| RBucket |
โ
|
โ
|
Simple key-value storage |
| RMap |
โ
|
โ
|
Distributed hash map |
| RList |
โ
|
โ
|
Distributed list |
| RSet |
โ
|
โ
|
Distributed set |
| RSortedSet |
โ
|
โ
|
Distributed sorted set |
| RStream |
โ
|
โ
|
Redis Stream with consumer groups |
| RLock |
โ
|
โ
|
Reentrant distributed lock |
| RFairLock |
โ
|
โ
|
Fair distributed lock |
| RReadWriteLock |
โ
|
โ
|
Read-write distributed lock |
| RRedLock |
โ
|
โ
|
RedLock algorithm implementation |
| RSemaphore |
โ
|
โ
|
Distributed semaphore |
| RRateLimiter |
โ
|
โ
|
Distributed rate limiter |
| RCountDownLatch |
โ
|
โ
|
Distributed countdown latch |
| RAtomicLong |
โ
|
โ
|
Distributed atomic long |
| RTopic |
โ
|
โ
|
Publish/subscribe messaging |
| RDelayedQueue |
โ
|
โ
|
Delayed task queue |
| RCache |
โ
|
โ
|
Local cache with Redis backend |
| RBatch |
โ
|
โ
|
Batch operation support |
๐ ๏ธ Advanced Features
Batch Operations
fn batch_operations(client: &RedissonClient) -> RedissonResult<()> {
let mut batch = client.create_batch();
for i in 1..=100 {
batch = batch.set(&format!("key:{}", i), &format!("value:{}", i));
}
let results = batch.execute()?;
println!("Batch executed with {} results", results.len());
Ok(())
}
Transaction Support
fn transaction_example(client: &RedissonClient) -> RedissonResult<()> {
let result = client.execute_transaction(|tx| {
let balance: i64 = tx.query("account:balance")?;
if balance >= 100 {
tx.set("account:balance", &(balance - 100))?
.set("transaction:log", &"Withdrawn 100".to_string())?;
Ok(())
} else {
Err(redisson::RedissonError::InvalidOperation(
"Insufficient balance".to_string()
))
}
});
match result {
Ok(_) => println!("Transaction successful"),
Err(e) => println!("Transaction failed: {}", e),
}
Ok(())
}
Watchdog Mechanism
The watchdog automatically renews locks to prevent premature expiration during long-running operations:
fn watchdog_example(client: &RedissonClient) -> RedissonResult<()> {
let lock = client.get_lock("long-running-task");
lock.lock()?;
for i in 1..=60 {
println!("Processing step {}...", i);
std::thread::sleep(Duration::from_secs(1));
}
lock.unlock()?;
Ok(())
}
๐ Performance Optimization
1. Connection Pooling
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(50) .with_idle_timeout(Duration::from_secs(60));
2. Batch Operations for Bulk Data
for item in items {
map.put(&item.key, &item.value)?;
}
let mut batch = client.create_batch();
for item in items {
batch = batch.set(&item.key, &item.value);
}
batch.execute()?;
3. Local Cache for Read-Heavy Workloads
let cache = client.get_cache::<String, Data>("read-heavy-data")
.with_local_cache_size(1000) .with_local_cache_ttl(Duration::from_secs(30));
4. Pipeline Configuration
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_batch_config(BatchConfig::default()
.with_max_batch_size(50) .with_pipeline(true) .with_max_wait_time(Duration::from_millis(10)));
๐ Monitoring and Statistics
fn monitor_performance(client: &RedissonClient) -> RedissonResult<()> {
let stats = client.get_stats();
println!("Connection Pool Statistics:");
println!(" Total connections: {}", stats.connection_stats.total_connections_created);
println!(" Connection reuse rate: {:.1}%", stats.connection_stats.connection_reuse_rate());
println!(" Peak connections: {}", stats.connection_stats.peak_connections);
println!("\nCache Statistics:");
println!(" Cache hit rate: {:.1}%", stats.cache_stats.avg_hit_rate * 100.0);
println!(" Total hits: {}", stats.cache_stats.total_hits);
println!(" Total misses: {}", stats.cache_stats.total_misses);
println!("\nBatch Statistics:");
println!(" Total batches: {}", stats.batch_stats.total_batches);
println!(" Average batch size: {:.1}", stats.batch_stats.avg_batch_size);
Ok(())
}
๐งช Testing
Unit Tests
#[cfg(test)]
mod tests {
use super::*;
use redisson::{RedissonClient, RedissonConfig};
#[test]
fn test_distributed_lock() -> RedissonResult<()> {
let config = RedissonConfig::single_server("redis://127.0.0.1:6379");
let client = RedissonClient::new(config)?;
let lock = client.get_lock("test:lock");
assert!(lock.try_lock()?);
assert!(lock.is_locked()?);
lock.unlock()?;
assert!(!lock.is_locked()?);
client.shutdown()?;
Ok(())
}
#[test]
fn test_cache_operations() -> RedissonResult<()> {
let config = RedissonConfig::single_server("redis://127.0.0.1:6379");
let client = RedissonClient::new(config)?;
let cache = client.get_cache::<String, String>("test:cache");
cache.set("key".to_string(), "value".to_string())?;
let value = cache.get(&"key".to_string())?;
assert_eq!(value, Some("value".to_string()));
cache.clear()?;
let cleared = cache.get(&"key".to_string())?;
assert_eq!(cleared, None);
client.shutdown()?;
Ok(())
}
}
Benchmark Tests
cargo bench
cargo bench --bench lock_benchmark
cargo bench --bench cache_benchmark
cargo bench --bench batch_benchmark
๐ Deployment
Docker Deployment
FROM rust:1.60 as builder
WORKDIR /usr/src/redisson
COPY . .
RUN cargo build --release
FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/src/redisson/target/release/redisson-example /usr/local/bin/
CMD ["redisson-example"]
Kubernetes Configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: redisson-app
spec:
replicas: 3
selector:
matchLabels:
app: redisson
template:
metadata:
labels:
app: redisson
spec:
containers:
- name: redisson
image: your-registry/redisson-app:latest
env:
- name: REDIS_URL
value: "redis://redis-master:6379"
- name: REDIS_POOL_SIZE
value: "20"
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"
๐ Security Best Practices
1. Secure Configuration
let config = RedissonConfig::single_server("redis://:password@secure-redis.example.com:6379")
.with_connection_timeout(Duration::from_secs(10))
.with_retry_count(3);
2. Connection Pool Security
let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
.with_pool_size(10) .with_max_lifetime(Duration::from_secs(3600)) .with_idle_timeout(Duration::from_secs(300));
3. Redis ACL Support
let config = RedissonConfig::single_server("redis://username:password@127.0.0.1:6379")
.with_database(0);
๐ค Contributing
We welcome contributions! Here's how you can help:
- Report Bugs: Create an issue with detailed information
- Suggest Features: Start a discussion about new features
- Submit PRs: Follow our contributing guidelines
- Improve Documentation: Help us make the docs better
- Add Examples: Create useful examples for common use cases
Development Setup
git clone https://github.com/wslongchen/redisson.git
cd redisson
cargo test
cargo test --all-features
cargo bench
cargo doc --open
cargo run --example basic
cargo run --example async_example --features async
Code Style
- Follow Rust conventions and clippy suggestions
- Use meaningful commit messages
- Add tests for new features
- Update documentation when adding features
- Keep API consistent and backward-compatible
๐ License
Licensed under either of:
at your option.
๐ Acknowledgments
-
Thanks to all contributors who have helped shape Akita
-
Inspired by great ORMs like Diesel, SQLx, and MyBatis
-
Built with โค๏ธ by the Cat&Dog Lab team
๐ Contact