use crate::client::stats::ClientStats;
use crate::{AsyncBatchProcessor, AsyncRAtomicLong, AsyncRBatch, AsyncRBitSet, AsyncRBlockingQueue, AsyncRBloomFilter, AsyncRBucket, AsyncRCountDownLatch, AsyncRFairLock, AsyncRGeo, AsyncRKeys, AsyncRList, AsyncRLock, AsyncRMap, AsyncRMultiLock, AsyncRRateLimiter, AsyncRReadWriteLock, AsyncRRedLock, AsyncRSemaphore, AsyncRSet, AsyncRSortedSet, AsyncRStream, AsyncRTopic, AsyncRedisConnectionManager, AsyncTransactionBuilder, AsyncTransactionContext, RedissonConfig, RedissonResult};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
pub struct AsyncRedissonClient {
config: RedissonConfig,
connection_manager: Arc<AsyncRedisConnectionManager>,
batch_processor: Arc<AsyncBatchProcessor>,
#[cfg(feature = "caching")]
cache_manager: Arc<crate::AsyncLocalCacheManager<String, String>>,
}
impl AsyncRedissonClient {
pub async fn new(config: RedissonConfig) -> RedissonResult<Self> {
let connection_manager = Arc::new(AsyncRedisConnectionManager::new(&config).await?);
let batch_optimizer = AsyncBatchProcessor::new(connection_manager.clone(), config.batch_config.clone().unwrap_or_default()).await?;
Ok(Self {
config,
connection_manager,
batch_processor: Arc::new(batch_optimizer),
#[cfg(feature = "caching")]
cache_manager: Arc::new(crate::AsyncLocalCacheManager::new(
Duration::from_secs(300),
1000,
)),
})
}
pub fn get_batch_processor(&self) -> &Arc<AsyncBatchProcessor> {
&self.batch_processor
}
pub fn get_bucket<V>(&self, name: &str) -> AsyncRBucket<V>
where
V: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
{
AsyncRBucket::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_map<K, V>(&self, name: &str) -> AsyncRMap<K, V>
where
K: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Eq + std::hash::Hash + 'static,
V: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
{
AsyncRMap::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_list<V: for<'de> DeserializeOwned + Serialize + Send + Sync + 'static>(&self, name: &str) -> AsyncRList<V> {
AsyncRList::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_set<V: Serialize + DeserializeOwned + Eq + std::hash::Hash + Send + Sync + 'static>(&self, name: &str) -> AsyncRSet<V> {
AsyncRSet::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_sorted_set<V: Serialize + DeserializeOwned + Send + Sync + 'static>(&self, name: &str) -> AsyncRSortedSet<V> {
AsyncRSortedSet::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_bloom_filter<V: AsRef<[u8]> + Send + Sync + 'static>(&self, name: &str, expected_insertions: usize, false_positive_rate: f64) -> AsyncRBloomFilter<V> {
AsyncRBloomFilter::new(self.connection_manager.clone(), name.to_string(), expected_insertions, false_positive_rate)
}
pub fn get_atomic_long(&self, name: &str) -> AsyncRAtomicLong {
AsyncRAtomicLong::new(self.connection_manager.clone(), name.to_string())
}
pub async fn get_semaphore(&self, name: &str, max_permits: usize) -> AsyncRSemaphore {
AsyncRSemaphore::new(self.connection_manager.clone(), name.to_string(), max_permits).await
}
pub fn get_rate_limiter(&self, name: &str, rate: f64, capacity: f64) -> AsyncRRateLimiter {
AsyncRRateLimiter::new(self.connection_manager.clone(), name.to_string(), rate, capacity)
}
pub async fn get_count_down_latch(&self, name: &str, count: i32) -> AsyncRCountDownLatch {
AsyncRCountDownLatch::new(self.connection_manager.clone(), name.to_string(), count).await
}
pub fn get_bit_set(&self, name: &str) -> AsyncRBitSet {
AsyncRBitSet::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_geo<V>(&self, name: &str) -> AsyncRGeo<V>
where
V: Serialize + DeserializeOwned + Send + Sync + Default + 'static,
{
AsyncRGeo::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_topic<V>(&self, name: &str) -> AsyncRTopic<V>
where V: Serialize + DeserializeOwned + Send + Sync + 'static + Clone {
AsyncRTopic::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_blocking_queue<V>(&self, name: &str) -> AsyncRBlockingQueue<V>
where
V: Serialize + DeserializeOwned + Send + Sync + 'static,
{
AsyncRBlockingQueue::new(self.connection_manager.clone(), name.to_string())
}
pub fn get_lock(&self, name: &str) -> AsyncRLock {
AsyncRLock::new(self.connection_manager.clone(), name.to_string(), self.config.watchdog_timeout)
}
pub fn get_fair_lock(&self, name: &str) -> AsyncRFairLock {
AsyncRFairLock::new(self.connection_manager.clone(), name.to_string(), self.config.watchdog_timeout)
}
pub fn get_multi_lock(&self, names: Vec<String>) -> AsyncRMultiLock {
let locks = names.iter()
.map(|name| self.get_lock(name))
.collect();
AsyncRMultiLock::new(locks)
}
pub fn get_red_lock(&self, names: String) -> AsyncRRedLock {
AsyncRRedLock::new(vec![self.connection_manager.clone()], names, self.config.watchdog_timeout)
}
pub fn get_read_write_lock(&self, name: &str, lease_time: Duration) -> AsyncRReadWriteLock {
AsyncRReadWriteLock::new(self.connection_manager.clone(), name.to_string(), lease_time)
}
pub async fn execute_transaction(&self, transaction_func: impl FnOnce(&mut AsyncTransactionContext) -> RedissonResult<()>) -> RedissonResult<()> {
let mut context = AsyncTransactionBuilder::new(self.connection_manager.clone()).build().await;
transaction_func(&mut context)?;
Ok(())
}
pub fn get_keys(&self) -> AsyncRKeys {
AsyncRKeys::new(self.connection_manager.clone())
}
pub fn get_stream<V>(&self, name: &str) -> AsyncRStream<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
AsyncRStream::new(self.connection_manager.clone(), name.to_string())
}
pub fn create_batch(&self) -> AsyncRBatch {
AsyncRBatch::new(self.batch_processor.clone())
}
#[cfg(feature = "caching")]
pub fn get_cache<K, V>(&self, name: &str) -> crate::AsyncRedisIntegratedCache<K, V>
where
K: Eq + Hash + Clone + Serialize + DeserializeOwned + std::fmt::Debug + Send + Sync + 'static,
V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static
{
crate::AsyncRedisIntegratedCache::<K, V>::new(
self.connection_manager.clone(),
name,
Duration::from_secs(300),
1000,
)
}
pub async fn get_stats(&self) -> ClientStats {
ClientStats {
connection_stats: self.connection_manager.get_stats().await,
batch_stats: self.batch_processor.get_stats().await,
#[cfg(feature = "caching")]
cache_stats: self.cache_manager.get_stats().await,
}
}
pub async fn shutdown(&self) -> RedissonResult<()> {
self.batch_processor.close().await?;
Ok(())
}
}
impl Clone for AsyncRedissonClient {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
connection_manager: self.connection_manager.clone(),
batch_processor: self.batch_processor.clone(),
#[cfg(feature = "caching")]
cache_manager: self.cache_manager.clone(),
}
}
}