use rs2_stream::state::traits::StateStorageType;
use rs2_stream::state::{InMemoryState, StateConfig, StateStorage};
use std::sync::Arc;
use std::time::Duration;
use tokio;
#[tokio::test]
async fn test_basic_storage_operations() {
let storage = InMemoryState::new(Duration::from_secs(60));
storage.set("key1", b"value1").await.unwrap();
let value = storage.get("key1").await;
assert_eq!(value, Some(b"value1".to_vec()));
let value = storage.get("nonexistent").await;
assert_eq!(value, None);
storage.set("key1", b"updated_value").await.unwrap();
let value = storage.get("key1").await;
assert_eq!(value, Some(b"updated_value".to_vec()));
}
#[tokio::test]
async fn test_exists_and_delete() {
let storage = InMemoryState::new(Duration::from_secs(60));
assert!(!storage.exists("key1").await);
storage.set("key1", b"value1").await.unwrap();
assert!(storage.exists("key1").await);
storage.delete("key1").await.unwrap();
assert!(!storage.exists("key1").await);
let value = storage.get("key1").await;
assert_eq!(value, None);
}
#[tokio::test]
async fn test_ttl_expiration() {
let storage = InMemoryState::new(Duration::from_millis(100));
storage.set("expiring_key", b"value").await.unwrap();
assert!(storage.exists("expiring_key").await);
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(!storage.exists("expiring_key").await);
let value = storage.get("expiring_key").await;
assert_eq!(value, None);
}
#[tokio::test]
async fn test_concurrent_access() {
let storage = Arc::new(InMemoryState::new(Duration::from_secs(60)));
let mut handles = Vec::new();
for i in 0..10 {
let storage_clone = Arc::clone(&storage);
let handle = tokio::spawn(async move {
let key = format!("key_{}", i);
let value = format!("value_{}", i);
storage_clone.set(&key, value.as_bytes()).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
for i in 0..10 {
let key = format!("key_{}", i);
let expected_value = format!("value_{}", i);
let value = storage.get(&key).await;
assert_eq!(value, Some(expected_value.as_bytes().to_vec()));
}
}
#[tokio::test]
async fn test_large_values() {
let storage = InMemoryState::new(Duration::from_secs(60));
let large_value = vec![b'x'; 1024 * 1024]; storage.set("large_key", &large_value).await.unwrap();
let retrieved = storage.get("large_key").await;
assert_eq!(retrieved, Some(large_value));
}
#[tokio::test]
async fn test_special_characters() {
let storage = InMemoryState::new(Duration::from_secs(60));
let special_value = "test@example.com:password123!@#";
storage
.set("special_key", special_value.as_bytes())
.await
.unwrap();
let retrieved = storage.get("special_key").await;
assert_eq!(retrieved, Some(special_value.as_bytes().to_vec()));
}
#[tokio::test]
async fn test_empty_values() {
let storage = InMemoryState::new(Duration::from_secs(60));
storage.set("", b"empty_key_value").await.unwrap();
let retrieved = storage.get("").await;
assert_eq!(retrieved, Some(b"empty_key_value".to_vec()));
storage.set("empty_value_key", b"").await.unwrap();
let retrieved = storage.get("empty_value_key").await;
assert_eq!(retrieved, Some(vec![]));
}
#[tokio::test]
async fn test_unicode_values() {
let storage = InMemoryState::new(Duration::from_secs(60));
let unicode_value = "Hello, 世界! 🌍";
storage
.set("unicode_key", unicode_value.as_bytes())
.await
.unwrap();
let retrieved = storage.get("unicode_key").await;
assert_eq!(retrieved, Some(unicode_value.as_bytes().to_vec()));
}
#[tokio::test]
async fn test_multiple_operations() {
let storage = InMemoryState::new(Duration::from_secs(60));
for i in 0..100 {
let key = format!("key_{}", i);
let value = format!("value_{}", i);
storage.set(&key, value.as_bytes()).await.unwrap();
}
for i in 0..100 {
let key = format!("key_{}", i);
let expected_value = format!("value_{}", i);
let value = storage.get(&key).await;
assert_eq!(value, Some(expected_value.as_bytes().to_vec()));
}
for i in 0..50 {
let key = format!("key_{}", i);
storage.delete(&key).await.unwrap();
}
for i in 0..50 {
let key = format!("key_{}", i);
let value = storage.get(&key).await;
assert_eq!(value, None);
}
for i in 50..100 {
let key = format!("key_{}", i);
let expected_value = format!("value_{}", i);
let value = storage.get(&key).await;
assert_eq!(value, Some(expected_value.as_bytes().to_vec()));
}
}
#[tokio::test]
async fn test_storage_with_config() {
let config = StateConfig {
storage_type: StateStorageType::InMemory,
ttl: Duration::from_secs(30),
cleanup_interval: Duration::from_secs(5),
max_size: Some(100),
custom_storage: None,
};
let storage = InMemoryState::new(Duration::from_secs(60));
storage.set("test_key", b"test_value").await.unwrap();
let value = storage.get("test_key").await;
assert_eq!(value, Some(b"test_value".to_vec()));
}
#[tokio::test]
async fn test_storage_error_handling() {
let storage = InMemoryState::new(Duration::from_secs(60));
storage.delete("nonexistent").await.unwrap();
storage.set("test", b"value").await.unwrap();
let value = storage.get("test").await;
assert_eq!(value, Some(b"value".to_vec()));
storage.delete("test").await.unwrap();
let value = storage.get("test").await;
assert_eq!(value, None);
}
#[tokio::test]
async fn test_storage_performance() {
let storage = InMemoryState::new(Duration::from_secs(60));
let start = std::time::Instant::now();
for i in 0..1000 {
let key = format!("perf_key_{}", i);
let value = format!("perf_value_{}", i);
storage.set(&key, value.as_bytes()).await.unwrap();
}
let write_duration = start.elapsed();
println!("Wrote 1000 keys in {:?}", write_duration);
let start = std::time::Instant::now();
for i in 0..1000 {
let key = format!("perf_key_{}", i);
let expected_value = format!("perf_value_{}", i);
let value = storage.get(&key).await;
assert_eq!(value, Some(expected_value.as_bytes().to_vec()));
}
let read_duration = start.elapsed();
println!("Read 1000 keys in {:?}", read_duration);
assert!(write_duration.as_millis() < 1000);
assert!(read_duration.as_millis() < 1000);
}