redisson 0.1.0

A Redis-based distributed synchronization and data structures library for Rust
Documentation

Redisson

๐ŸŽฏ Features

  • ๐Ÿ”’ Distributed Locks: Reentrant locks, fair locks, read-write locks, and RedLock algorithm with automatic renewal
  • ๐Ÿ“Š Rich Data Structures: Distributed maps, lists, sets, sorted sets, buckets, and streams
  • โšก Dual Runtime: Full support for both synchronous and asynchronous operations
  • ๐Ÿ”„ Synchronization Primitives: Semaphores, rate limiters, countdown latches, and atomic counters
  • ๐Ÿ“ˆ High Performance: Connection pooling, command pipelining, and batch operations
  • ๐Ÿ’พ Local Cache Integration: Read-through/write-through caching with local cache
  • ๐Ÿ”ง Comprehensive Configuration: Flexible configuration for various Redis deployment modes
  • ๐ŸŽฏ Type Safety: Full Rust type system support with compile-time checking
  • ๐Ÿ›ก๏ธ Production Ready: Automatic reconnection, timeout handling, and comprehensive error management
  • ๐Ÿ“ก Advanced Features: Redis Stream support, delayed queues, and publish/subscribe messaging

๐Ÿ“ฆ Installation

Add this to your Cargo.toml:

Basic Installation

[dependencies]
redisson = "0.1"

With Async Support (requires Tokio)

[dependencies]
redisson = { version = "0.1", features = ["async"] }
tokio = { version = "1", features = ["full"] }

With Additional Features

[dependencies]
redisson = { version = "0.1", features = ["async", "caching"] }

๐Ÿš€ Quick Start

1. Basic Synchronous Usage

use redisson::{RedissonClient, RedissonConfig};
use std::time::Duration;

fn main() -> redisson::RedissonResult<()> {
    // Create configuration
    let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
        .with_pool_size(10)
        .with_connection_timeout(Duration::from_secs(5));

    // Create client
    let client = RedissonClient::new(config)?;

    // Use distributed lock
    let lock = client.get_lock("my-resource");
    lock.lock()?;
    println!("Critical section accessed");
    lock.unlock()?;

    // Use distributed data structures
    let bucket = client.get_bucket::<String>("my-bucket");
    bucket.set(&"Hello World".to_string())?;
    let value: Option<String> = bucket.get()?;
    println!("Bucket value: {:?}", value);

    // Use distributed map
    let map = client.get_map::<String, i32>("my-map");
    map.put(&"key1".to_string(), &42)?;

    client.shutdown()?;
    Ok(())
}

2. Asynchronous Usage

use redisson::{AsyncRedissonClient, RedissonConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> redisson::RedissonResult<()> {
    // Create async configuration
    let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
        .with_pool_size(10);

    // Create async client
    let client = AsyncRedissonClient::new(config).await?;

    // Use async distributed lock
    let lock = client.get_lock("async-resource");
    lock.lock().await?;
    println!("Async critical section accessed");
    lock.unlock().await?;

    // Use async data operations
    let bucket = client.get_bucket::<String>("async-data");
    bucket.set(&"Async Value".to_string()).await?;
    let value = bucket.get().await?;
    println!("Async value: {:?}", value);

    client.shutdown().await?;
    Ok(())
}

3. Distributed Lock with Watchdog

use std::time::Duration;

fn lock_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
    // Get a reentrant lock with automatic renewal
    let lock = client.get_lock("database-update");
    
    // Try to acquire lock with timeout
    if lock.try_lock_with_timeout(Duration::from_secs(5))? {
        // Lock acquired - watchdog will automatically renew the lock
        println!("Lock acquired, performing critical operations...");
        
        // Simulate long-running operation
        std::thread::sleep(Duration::from_secs(30));
        
        lock.unlock()?;
        println!("Lock released");
    } else {
        println!("Failed to acquire lock within timeout");
    }
    
    Ok(())
}

4. Redis Stream Support

use std::collections::HashMap;

fn stream_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
    let stream = client.get_stream::<String>("orders:stream");
    
    // Create consumer group
    stream.create_group("order-processors", "0")?;
    
    // Add messages to stream
    let mut fields = HashMap::new();
    fields.insert("order".to_string(), "Order #1234".to_string());
    let message_id = stream.add_auto_id(&fields)?;
    println!("Message added with ID: {}", message_id);
    
    // Read messages from consumer group
    let messages = stream.read_group("order-processors", "consumer-1", Some(10), None, false)?;
    for message in messages {
        println!("Received message: {:?}", message);
    }
    
    Ok(())
}

5. Local Cache Integration

use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct UserSession {
    user_id: String,
    session_token: String,
    last_activity: u64,
}

fn cache_example(client: &RedissonClient) -> redisson::RedissonResult<()> {
    // Create cache with local caching
    let user_cache = client.get_cache::<String, UserSession>("user_sessions");
    
    // Set data (will be cached locally)
    let session = UserSession {
        user_id: "user123".to_string(),
        session_token: "abc123def456".to_string(),
        last_activity: 1234567890,
    };
    
    user_cache.set("user123".to_string(), session.clone())?;
    
    // First read (may go to Redis)
    let cached = user_cache.get(&"user123".to_string())?;
    println!("First read: {:?}", cached);
    
    // Second read (from local cache - faster)
    let cached_again = user_cache.get(&"user123".to_string())?;
    println!("Second read: {:?}", cached_again);
    
    // Get cache statistics
    let cache_stats = user_cache.get_local_cache().get_stats();
    println!("Cache hits: {}", cache_stats.total_hits);
    
    Ok(())
}

๐Ÿ“Š Complete Example: Order Processing System

use redisson::{RedissonClient, RedissonConfig};
use std::time::Duration;
use std::collections::HashMap;
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
    id: String,
    customer_id: String,
    amount: f64,
    status: OrderStatus,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
enum OrderStatus {
    Pending,
    Processing,
    Completed,
    Failed,
}

fn main() -> redisson::RedissonResult<()> {
    let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
        .with_pool_size(20)
        .with_connection_timeout(Duration::from_secs(5));

    let client = RedissonClient::new(config)?;

    // 1. Use distributed lock for order processing
    process_order_with_lock(&client)?;
    
    // 2. Use Redis Stream for order events
    publish_order_events(&client)?;
    
    // 3. Use cache for order data
    cache_order_data(&client)?;
    
    // 4. Use rate limiter for API calls
    rate_limit_api_calls(&client)?;
    
    client.shutdown()?;
    Ok(())
}

fn process_order_with_lock(client: &RedissonClient) -> RedissonResult<()> {
    let lock = client.get_lock("order:process:123");
    
    if lock.try_lock_with_timeout(Duration::from_secs(10))? {
        println!("Processing order 123...");
        // Simulate order processing
        std::thread::sleep(Duration::from_secs(2));
        lock.unlock()?;
        println!("Order processing completed");
    }
    
    Ok(())
}

fn publish_order_events(client: &RedissonClient) -> RedissonResult<()> {
    let order_stream = client.get_stream::<Order>("orders:events");
    
    let order = Order {
        id: "ORD-001".to_string(),
        customer_id: "CUST-001".to_string(),
        amount: 299.99,
        status: OrderStatus::Processing,
    };
    
    let mut fields = HashMap::new();
    fields.insert("order".to_string(), order);
    
    let message_id = order_stream.add_auto_id(&fields)?;
    println!("Order event published with ID: {}", message_id);
    
    Ok(())
}

fn cache_order_data(client: &RedissonClient) -> RedissonResult<()> {
    let order_cache = client.get_cache::<String, Order>("orders:cache");
    
    let order = Order {
        id: "ORD-001".to_string(),
        customer_id: "CUST-001".to_string(),
        amount: 299.99,
        status: OrderStatus::Pending,
    };
    
    order_cache.set("ORD-001".to_string(), order)?;
    
    // Subsequent reads will be faster with local cache
    let cached_order = order_cache.get(&"ORD-001".to_string())?;
    println!("Cached order: {:?}", cached_order);
    
    Ok(())
}

fn rate_limit_api_calls(client: &RedissonClient) -> RedissonResult<()> {
    let rate_limiter = client.get_rate_limiter("api:orders", 10.0, 20.0); // 10 req/s, burst 20
    
    for i in 1..=15 {
        if rate_limiter.try_acquire(1.0)? {
            println!("API call {}: Allowed", i);
        } else {
            println!("API call {}: Rate limited", i);
        }
        std::thread::sleep(Duration::from_millis(50));
    }
    
    Ok(())
}

๐Ÿ”ง Configuration

Basic Configuration

use redisson::RedissonConfig;
use std::time::Duration;

let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
    .with_pool_size(20)                    // Connection pool size
    .with_connection_timeout(Duration::from_secs(5))
    .with_response_timeout(Duration::from_secs(3))
    .with_lock_expire_time(Duration::from_secs(30))
    .with_watchdog_timeout(Duration::from_secs(10))
    .with_retry_count(3)
    .with_drift_factor(0.01)               // Clock drift factor for RedLock
    .with_backup_pool_count(2);

Redis Cluster Mode

let config = RedissonConfig::cluster(vec![
    "redis://127.0.0.1:7000",
    "redis://127.0.0.1:7001",
    "redis://127.0.0.1:7002",
])
.with_cluster_scan_interval(Duration::from_secs(5))
.with_pool_size(10);

Redis Sentinel Mode

let config = RedissonConfig::sentinel(vec![
    "redis://127.0.0.1:26379",
    "redis://127.0.0.1:26380",
    "redis://127.0.0.1:26381",
], "mymaster")
.with_sentinel_password("password")
.with_database(0);

๐Ÿ“Š Supported Data Structures

Data Structure Sync Support Async Support Description
RBucket โœ… โœ… Simple key-value storage
RMap โœ… โœ… Distributed hash map
RList โœ… โœ… Distributed list
RSet โœ… โœ… Distributed set
RSortedSet โœ… โœ… Distributed sorted set
RStream โœ… โœ… Redis Stream with consumer groups
RLock โœ… โœ… Reentrant distributed lock
RFairLock โœ… โœ… Fair distributed lock
RReadWriteLock โœ… โœ… Read-write distributed lock
RRedLock โœ… โœ… RedLock algorithm implementation
RSemaphore โœ… โœ… Distributed semaphore
RRateLimiter โœ… โœ… Distributed rate limiter
RCountDownLatch โœ… โœ… Distributed countdown latch
RAtomicLong โœ… โœ… Distributed atomic long
RTopic โœ… โœ… Publish/subscribe messaging
RDelayedQueue โœ… โœ… Delayed task queue
RCache โœ… โœ… Local cache with Redis backend
RBatch โœ… โœ… Batch operation support

๐Ÿ› ๏ธ Advanced Features

Batch Operations

fn batch_operations(client: &RedissonClient) -> RedissonResult<()> {
    let mut batch = client.create_batch();
    
    // Add multiple operations
    for i in 1..=100 {
        batch = batch.set(&format!("key:{}", i), &format!("value:{}", i));
    }
    
    // Execute all operations in a single network call
    let results = batch.execute()?;
    println!("Batch executed with {} results", results.len());
    
    Ok(())
}

Transaction Support

fn transaction_example(client: &RedissonClient) -> RedissonResult<()> {
    // Execute operations in a transaction
    let result = client.execute_transaction(|tx| {
        let balance: i64 = tx.query("account:balance")?;
        
        if balance >= 100 {
            tx.set("account:balance", &(balance - 100))?
                .set("transaction:log", &"Withdrawn 100".to_string())?;
            Ok(())
        } else {
            Err(redisson::RedissonError::InvalidOperation(
                "Insufficient balance".to_string()
            ))
        }
    });
    
    match result {
        Ok(_) => println!("Transaction successful"),
        Err(e) => println!("Transaction failed: {}", e),
    }
    
    Ok(())
}

Watchdog Mechanism

The watchdog automatically renews locks to prevent premature expiration during long-running operations:

fn watchdog_example(client: &RedissonClient) -> RedissonResult<()> {
    let lock = client.get_lock("long-running-task");
    
    // Lock with automatic renewal
    lock.lock()?;
    
    // Long-running operation - watchdog will renew the lock
    for i in 1..=60 {
        println!("Processing step {}...", i);
        std::thread::sleep(Duration::from_secs(1));
        
        // Lock will be automatically renewed every 10 seconds
        // (configurable via RedissonConfig)
    }
    
    lock.unlock()?;
    Ok(())
}

๐Ÿ“ˆ Performance Optimization

1. Connection Pooling

let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
    .with_pool_size(50)                    // Adjust based on workload
    .with_idle_timeout(Duration::from_secs(60));

2. Batch Operations for Bulk Data

// Instead of individual calls
for item in items {
    map.put(&item.key, &item.value)?;
}

// Use batch operations
let mut batch = client.create_batch();
for item in items {
    batch = batch.set(&item.key, &item.value);
}
batch.execute()?;

3. Local Cache for Read-Heavy Workloads

let cache = client.get_cache::<String, Data>("read-heavy-data")
    .with_local_cache_size(1000)           // Cache 1000 items locally
    .with_local_cache_ttl(Duration::from_secs(30));

4. Pipeline Configuration

let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
    .with_batch_config(BatchConfig::default()
        .with_max_batch_size(50)           // Optimal batch size
        .with_pipeline(true)               // Enable pipelining
        .with_max_wait_time(Duration::from_millis(10)));

๐Ÿ” Monitoring and Statistics

fn monitor_performance(client: &RedissonClient) -> RedissonResult<()> {
    let stats = client.get_stats();
    
    println!("Connection Pool Statistics:");
    println!("  Total connections: {}", stats.connection_stats.total_connections_created);
    println!("  Connection reuse rate: {:.1}%", stats.connection_stats.connection_reuse_rate());
    println!("  Peak connections: {}", stats.connection_stats.peak_connections);
    
    println!("\nCache Statistics:");
    println!("  Cache hit rate: {:.1}%", stats.cache_stats.avg_hit_rate * 100.0);
    println!("  Total hits: {}", stats.cache_stats.total_hits);
    println!("  Total misses: {}", stats.cache_stats.total_misses);
    
    println!("\nBatch Statistics:");
    println!("  Total batches: {}", stats.batch_stats.total_batches);
    println!("  Average batch size: {:.1}", stats.batch_stats.avg_batch_size);
    
    Ok(())
}

๐Ÿงช Testing

Unit Tests

#[cfg(test)]
mod tests {
    use super::*;
    use redisson::{RedissonClient, RedissonConfig};
    
    #[test]
    fn test_distributed_lock() -> RedissonResult<()> {
        let config = RedissonConfig::single_server("redis://127.0.0.1:6379");
        let client = RedissonClient::new(config)?;
        
        let lock = client.get_lock("test:lock");
        assert!(lock.try_lock()?);
        assert!(lock.is_locked()?);
        lock.unlock()?;
        assert!(!lock.is_locked()?);
        
        client.shutdown()?;
        Ok(())
    }
    
    #[test]
    fn test_cache_operations() -> RedissonResult<()> {
        let config = RedissonConfig::single_server("redis://127.0.0.1:6379");
        let client = RedissonClient::new(config)?;
        
        let cache = client.get_cache::<String, String>("test:cache");
        cache.set("key".to_string(), "value".to_string())?;
        
        let value = cache.get(&"key".to_string())?;
        assert_eq!(value, Some("value".to_string()));
        
        cache.clear()?;
        let cleared = cache.get(&"key".to_string())?;
        assert_eq!(cleared, None);
        
        client.shutdown()?;
        Ok(())
    }
}

Benchmark Tests

# Run benchmarks
cargo bench

# Run specific benchmark
cargo bench --bench lock_benchmark
cargo bench --bench cache_benchmark
cargo bench --bench batch_benchmark

๐Ÿš€ Deployment

Docker Deployment

FROM rust:1.60 as builder
WORKDIR /usr/src/redisson
COPY . .
RUN cargo build --release

FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y libssl-dev ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/src/redisson/target/release/redisson-example /usr/local/bin/
CMD ["redisson-example"]

Kubernetes Configuration

apiVersion: apps/v1
kind: Deployment
metadata:
  name: redisson-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: redisson
  template:
    metadata:
      labels:
        app: redisson
    spec:
      containers:
      - name: redisson
        image: your-registry/redisson-app:latest
        env:
        - name: REDIS_URL
          value: "redis://redis-master:6379"
        - name: REDIS_POOL_SIZE
          value: "20"
        resources:
          requests:
            memory: "64Mi"
            cpu: "100m"
          limits:
            memory: "128Mi"
            cpu: "200m"

๐Ÿ”’ Security Best Practices

1. Secure Configuration

let config = RedissonConfig::single_server("redis://:password@secure-redis.example.com:6379")
    .with_connection_timeout(Duration::from_secs(10))
    .with_retry_count(3);

2. Connection Pool Security

let config = RedissonConfig::single_server("redis://127.0.0.1:6379")
    .with_pool_size(10)                    // Limit connection pool size
    .with_max_lifetime(Duration::from_secs(3600))  // Rotate connections periodically
    .with_idle_timeout(Duration::from_secs(300));   // Close idle connections

3. Redis ACL Support

let config = RedissonConfig::single_server("redis://username:password@127.0.0.1:6379")
    .with_database(0);                      // Use specific database

๐Ÿค Contributing

We welcome contributions! Here's how you can help:

  1. Report Bugs: Create an issue with detailed information
  2. Suggest Features: Start a discussion about new features
  3. Submit PRs: Follow our contributing guidelines
  4. Improve Documentation: Help us make the docs better
  5. Add Examples: Create useful examples for common use cases

Development Setup

# Clone the repository
git clone https://github.com/wslongchen/redisson.git
cd redisson

# Run tests
cargo test

# Run tests with all features
cargo test --all-features

# Run benchmarks
cargo bench

# Build documentation
cargo doc --open

# Run examples
cargo run --example basic
cargo run --example async_example --features async

Code Style

  • Follow Rust conventions and clippy suggestions
  • Use meaningful commit messages
  • Add tests for new features
  • Update documentation when adding features
  • Keep API consistent and backward-compatible

๐Ÿ“„ License

Licensed under either of:

  • Apache License, Version 2.0 (LICENSE-APACHE)

  • MIT license (LICENSE-MIT)

at your option.

๐Ÿ™ Acknowledgments

  • Thanks to all contributors who have helped shape Akita

  • Inspired by great ORMs like Diesel, SQLx, and MyBatis

  • Built with โค๏ธ by the Cat&Dog Lab team

๐Ÿ“ž Contact

  • Author: Mr.Pan

  • Email: 1049058427@qq.com

  • GitHub: @wslongchen

  • Project: Akita on GitHub