use std::time::Duration;
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use redisson::{BatchResult, Cache, RedissonClient, RedissonConfig, RedissonResult, SetCommand};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
customer_id: String,
amount: f64,
items: Vec<OrderItem>,
status: OrderStatus,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct OrderItem {
product_id: String,
quantity: i32,
price: f64,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
enum OrderStatus {
Pending,
Processing,
Shipped,
Delivered,
Cancelled,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct UserSession {
user_id: String,
session_token: String,
created_at: u64,
last_activity: u64,
ip_address: String,
}
fn main() -> RedissonResult<()> {
println!("๐ Redisson Rust - Complete Optimized Example");
println!("=============================================\n");
let config = RedissonConfig::single_server("redis://172.16.8.16:6379")
.with_pool_size(20)
.with_connection_timeout(Duration::from_secs(5))
.with_response_timeout(Duration::from_secs(3))
.with_lock_expire_time(Duration::from_secs(30))
.with_watchdog_timeout(Duration::from_secs(10))
.with_retry_count(3)
.with_drift_factor(0.01)
.with_backup_pool_count(2);
let client = RedissonClient::new(config)?;
println!("โ
Client created successfully\n");
println!("๐ก Redis Stream Demo");
stream_demo(&client)?;
println!("\n๐ Batch Operation Optimization Demo");
batch_optimization_demo(&client)?;
println!("\n๐พ Local Cache Integration Demo");
cache_integration_demo(&client)?;
println!("\n๐ Performance Statistics");
show_stats(&client)?;
println!("\n๐ All demos completed!");
client.shutdown()?;
println!("๐ Client closed");
Ok(())
}
fn stream_demo(client: &RedissonClient) -> RedissonResult<()> {
println!(" 1. Creating order stream...");
let order_stream = client.get_stream::<Order>("orders:stream");
order_stream.create_group("order-processors", "0")?;
println!(" โ
Created consumer group: order-processors");
println!(" 2. Publishing order messages...");
let order1 = Order {
id: "ORD-001".to_string(),
customer_id: "CUST-001".to_string(),
amount: 299.99,
items: vec![
OrderItem {
product_id: "PROD-001".to_string(),
quantity: 2,
price: 149.99,
},
],
status: OrderStatus::Pending,
};
let mut fields1 = HashMap::new();
fields1.insert("order".to_string(), order1.clone());
let message_id1 = order_stream.add_auto_id(&fields1)?;
println!(" ๐จ Order 1 message ID: {}", message_id1);
let order2 = Order {
id: "ORD-002".to_string(),
customer_id: "CUST-002".to_string(),
amount: 599.99,
items: vec![
OrderItem {
product_id: "PROD-002".to_string(),
quantity: 1,
price: 599.99,
},
],
status: OrderStatus::Pending,
};
let mut fields2 = HashMap::new();
fields2.insert("order".to_string(), order2.clone());
let message_id2 = order_stream.add_auto_id(&fields2)?;
println!(" ๐จ Order 2 message ID: {}", message_id2);
println!(" 3. Consuming order messages...");
let messages = order_stream.read_group("order-processors", "consumer-1", Some(10), None, false)?;
println!(" ๐ฅ Received {} messages", messages.len());
for (i, message) in messages.iter().enumerate() {
if let Some(order_field) = message.fields.get("order") {
println!(" ๐ Message {}: Order ID: {}, Amount: ${}",
i + 1, order_field.id, order_field.amount);
order_stream.ack("order-processors", &[message.id.clone()])?;
}
}
println!(" 4. Getting stream information...");
let info = order_stream.info()?;
println!(" ๐ Stream length: {}", info.length);
println!(" ๐ฅ Number of consumer groups: {}", info.groups);
println!(" 5. Trimming stream...");
let trimmed = order_stream.trim(1000, true)?;
println!(" โ๏ธ Trimmed {} messages", trimmed);
Ok(())
}
fn batch_optimization_demo(client: &RedissonClient) -> RedissonResult<()> {
println!(" 1. Creating batch operation...");
let mut batch = &mut client.create_batch();
let start = std::time::Instant::now();
for i in 1..=100 {
let key = format!("batch:user:{}", i);
let value = format!("User {}", i);
batch = batch.set(&key, &value);
if i % 10 == 0 {
batch = batch.get::<String>(key.to_string());
}
if i % 20 == 0 {
batch = batch.expire(key, 3600);
}
}
println!(" ๐ Added 100 SET operations, 10 GET operations, and 5 EXPIRE operations");
println!(" 2. Executing batch operation...");
let results = batch.execute()?.unwrap_or_default();
let duration = start.elapsed();
println!(" โก Batch execution duration: {:?}", duration);
println!(" ๐ Number of results returned: {}", results.len());
let set_count = results.iter()
.filter(|r| !matches!(r, BatchResult::Error(_)))
.count();
let get_count = results.iter()
.filter(|r| !matches!(r, BatchResult::Error(_)))
.count();
println!(" โ
SET operations successful: {}", set_count);
println!(" โ
GET results: {}", get_count);
println!(" 3. Priority batch operations...");
let batch_optimizer = client.get_batch_processor();
for i in 1..=5 {
let key = format!("priority:high:{}", i);
let value = format!("High Priority {}", i);
batch_optimizer.exec_batch(
vec![Box::new(SetCommand::new(key, value).with_ttl(Duration::from_secs(300)))]
)?;
}
println!(" ๐ Added 5 high priority operations");
for i in 1..=20 {
let key = format!("priority:normal:{}", i);
let value = format!("Normal Priority {}", i);
batch_optimizer.exec_batch(
vec![Box::new(SetCommand::new(key, value))]
)?;
}
println!(" ๐ Added 20 normal priority operations");
batch_optimizer.flush()?;
println!(" ๐ Batch flush completed");
Ok(())
}
fn cache_integration_demo(client: &RedissonClient) -> RedissonResult<()> {
println!(" 1. Creating integrated cache...");
let user_cache = client.get_cache::<String, UserSession>("user_sessions");
println!(" ๐พ Cache created successfully (read-through/write-through mode)");
println!(" 2. Setting cache data...");
let session = UserSession {
user_id: "user123".to_string(),
session_token: "abc123def456".to_string(),
created_at: 1234567890,
last_activity: 1234567990,
ip_address: "192.168.1.100".to_string(),
};
user_cache.set("user123".to_string(), session.clone())?;
println!(" ๐พ Session data cached");
println!(" 3. Reading cached data...");
let start = std::time::Instant::now();
let cached_session = user_cache.get(&"user123".to_string())?;
let first_duration = start.elapsed();
if let Some(session) = cached_session {
println!(" โ
Cache hit: User {}", session.user_id);
println!(" โก First read duration: {:?}", first_duration);
}
println!(" 4. Reading again (local cache)...");
let start = std::time::Instant::now();
let _cached_session2 = user_cache.get(&"user123".to_string())?;
let second_duration = start.elapsed();
println!(" โก Second read duration: {:?}", second_duration);
println!(" ๐ Performance improvement: {:.1}x",
first_duration.as_nanos() as f64 / second_duration.as_nanos() as f64);
println!(" 5. Cache statistics...");
let cache_stats = user_cache.get_local_cache().get_stats();
let client_stats = client.get_stats();
println!(" ๐ Local cache hit rate: {:.1}%",
client_stats.cache_stats.avg_hit_rate * 100.0);
println!(" ๐ฟ Local cache entries: {}", cache_stats.total_entries);
println!(" ๐ฅ Local cache hits: {}", cache_stats.total_hits);
println!(" 6. Clearing cache...");
user_cache.clear()?;
println!(" ๐งน Cache cleared");
Ok(())
}
fn show_stats(client: &RedissonClient) -> RedissonResult<()> {
let stats = client.get_stats();
println!(" Connection Pool Statistics:");
println!(" ๐ Total connections created: {}", stats.connection_stats.total_connections_created);
println!(" ๐ Connection reuse rate: {:.1}%",
stats.connection_stats.total_connections_reused as f64 /
stats.connection_stats.total_operations as f64 * 100.0);
println!(" โก Average wait time: {:.2}ms", stats.connection_stats.total_wait_time_ms);
println!(" ๐ Peak connections: {}", stats.connection_stats.peak_connections);
println!("\n Batch Operation Statistics:");
println!(" ๐ฆ Total batches: {}", stats.batch_stats.total_batches);
println!(" ๐ Total commands: {}", stats.batch_stats.total_commands);
println!(" ๐ Average batch size: {:.1}", stats.batch_stats.avg_batch_size);
println!(" โฑ๏ธ Average execution time: {:.2}ms", stats.batch_stats.avg_execution_time_ms);
println!("\n Cache Statistics:");
println!(" ๐ฏ Total hits: {}", stats.cache_stats.total_hits);
println!(" โ Total misses: {}", stats.cache_stats.total_misses);
println!(" ๐ Hit rate: {:.1}%", stats.cache_stats.avg_hit_rate * 100.0);
println!(" ๐๏ธ Total evictions: {}", stats.cache_stats.total_evictions);
Ok(())
}