use crate::{convert_value_to_batch_result, AsyncBatchProcessor, AsyncRedisConnectionManager, BatchConfig, BatchResult, BatchStats, CommandBuilder, DelCommand, ExpireCommand, GenericCommand, GetCommand, HGetCommand, HSetCommand, IncrByCommand, LPushCommand, RedissonError, RedissonResult, SAddCommand, SetCommand, TransactionConfig, TransactionResult};
use redis::Value;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time;
use uuid::Uuid;
pub struct AsyncTransactionContext {
manager: Arc<AsyncRedisConnectionManager>,
watch_keys: Vec<String>,
commands: Vec<Box<dyn CommandBuilder>>,
read_only: bool,
optimistic_lock_attempts: usize,
batch_processor: Arc<AsyncBatchProcessor>,
transaction_config: TransactionConfig,
}
impl AsyncTransactionContext {
pub async fn new(manager: Arc<AsyncRedisConnectionManager>) -> Self {
let batch_config = BatchConfig::default();
let transaction_config = TransactionConfig::default();
let batch_processor = AsyncBatchProcessor::new(
manager.clone(),
batch_config,
).await.expect("Failed to create async batch processor");
Self {
manager,
watch_keys: Vec::new(),
commands: Vec::new(),
read_only: false,
optimistic_lock_attempts: 0,
batch_processor: Arc::new(batch_processor),
transaction_config,
}
}
pub async fn with_config(
manager: Arc<AsyncRedisConnectionManager>,
transaction_config: TransactionConfig,
batch_config: Option<BatchConfig>,
) -> Self {
let batch_config = batch_config.unwrap_or_default();
let batch_processor = AsyncBatchProcessor::new(
manager.clone(),
batch_config,
).await.expect("Failed to create async batch processor");
Self {
manager,
watch_keys: Vec::new(),
commands: Vec::new(),
read_only: false,
optimistic_lock_attempts: 0,
batch_processor: Arc::new(batch_processor),
transaction_config,
}
}
pub fn set_config(&mut self, config: TransactionConfig) -> &mut Self {
self.transaction_config = config;
self
}
pub async fn set_batch_config(&mut self, config: BatchConfig) -> RedissonResult<&mut Self> {
self.batch_processor = Arc::new(AsyncBatchProcessor::new(
self.manager.clone(),
config,
).await?);
Ok(self)
}
pub fn watch(&mut self, key: &str) -> &mut Self {
if self.transaction_config.enable_watch {
self.watch_keys.push(key.to_string());
}
self
}
pub fn watch_multi(&mut self, keys: &[&str]) -> &mut Self {
if self.transaction_config.enable_watch {
for key in keys {
self.watch_keys.push(key.to_string());
}
}
self
}
pub fn read_only(&mut self) -> &mut Self {
self.read_only = true;
self
}
pub async fn query<K, T>(&mut self, key: K) -> RedissonResult<T>
where
K: ToString,
T: for<'de> Deserialize<'de>,
{
let key_str = key.to_string();
let mut conn = self.manager.get_connection().await?;
if !self.watch_keys.contains(&key_str) && self.transaction_config.enable_watch {
self.watch_keys.push(key_str.clone());
}
let result: Option<String> = redis::cmd("GET").arg(&key_str).query_async(&mut conn).await?;
match result {
Some(json_str) => {
serde_json::from_str(&json_str).map_err(|e| RedissonError::SerializationError(e.to_string()))
}
None => Err(RedissonError::InvalidOperation(key_str)),
}
}
pub async fn hquery<K, F, T>(&mut self, key: K, field: F) -> RedissonResult<T>
where
K: ToString,
F: ToString,
T: for<'de> Deserialize<'de>,
{
let key_str = key.to_string();
let field_str = field.to_string();
let mut conn = self.manager.get_connection().await?;
if !self.watch_keys.contains(&key_str) && self.transaction_config.enable_watch {
self.watch_keys.push(key_str.clone());
}
let result: Option<String> = redis::cmd("HGET").arg(&key_str).arg(&field_str).query_async(&mut conn).await?;
match result {
Some(json_str) => {
serde_json::from_str(&json_str).map_err(|e| RedissonError::SerializationError(e.to_string()))
}
None => Err(RedissonError::InvalidOperation(format!("{}.{}", key_str, field_str))),
}
}
pub async fn exec_and_get<K, T>(&mut self, key: K) -> RedissonResult<T>
where
K: ToString,
T: for<'de> Deserialize<'de>,
{
self.get(key);
let result = self.execute().await?;
if let Some(last_result) = result.results.last() {
match last_result {
BatchResult::String(s) => {
return serde_json::from_str(s)
.map_err(|e| RedissonError::SerializationError(e.to_string()));
}
BatchResult::Nil => {
return Err(RedissonError::InvalidOperation("Key Data not exist".to_string()));
}
_ => Err(RedissonError::InvalidOperation("Unexpected result type".to_string())),
}
} else {
Err(RedissonError::InvalidOperation("No results returned".to_string()))
}
}
pub async fn exec_and_get_all(&mut self) -> RedissonResult<TransactionResult> {
self.execute().await
}
pub fn set<K, V>(&mut self, key: K, value: V) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
{
let value_json = serde_json::to_string(&value)?;
let command = SetCommand::new(key, value_json);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn set_ex<K, V>(&mut self, key: K, value: V, ttl: Duration) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
{
let value_json = serde_json::to_string(&value)?;
let command = SetCommand::new(key, value_json).with_ttl(ttl);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn set_nx<K, V>(&mut self, key: K, value: V) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
{
let value_json = serde_json::to_string(&value)?;
let command = GenericCommand::new(&["SETNX", &key.to_string(), &value_json], true);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn get<K>(&mut self, key: K) -> &mut Self
where
K: ToString,
{
let command = GetCommand::new(key);
self.commands.push(Box::new(command));
self
}
pub fn del<K>(&mut self, key: K) -> &mut Self
where
K: ToString,
{
let command = DelCommand::new(key);
self.commands.push(Box::new(command));
self
}
pub fn del_multi<K, I>(&mut self, keys: I) -> &mut Self
where
K: ToString,
I: IntoIterator<Item = K>,
{
let keys_vec: Vec<String> = keys.into_iter().map(|k| k.to_string()).collect();
let command = DelCommand::multiple(&keys_vec);
self.commands.push(Box::new(command));
self
}
pub fn incr<K>(&mut self, key: K, delta: i64) -> &mut Self
where
K: ToString,
{
let command = IncrByCommand::new(key, delta);
self.commands.push(Box::new(command));
self
}
pub fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> RedissonResult<&mut Self>
where
K: ToString,
F: ToString,
V: Serialize,
{
let value_json = serde_json::to_string(&value)?;
let command = HSetCommand::new(key, field, value_json);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn hget<K, F>(&mut self, key: K, field: F) -> &mut Self
where
K: ToString,
F: ToString,
{
let command = HGetCommand::new(key, field);
self.commands.push(Box::new(command));
self
}
pub fn hdel<K, F, I>(&mut self, key: K, fields: I) -> &mut Self
where
K: ToString,
F: ToString,
I: IntoIterator<Item = F>,
{
let fields_vec: Vec<String> = fields.into_iter().map(|f| f.to_string()).collect();
let mut args = vec!["HDEL".to_string(), key.to_string()];
args.extend(fields_vec.clone());
let command = GenericCommand::new(&args, true);
self.commands.push(Box::new(command));
self
}
pub fn sadd<K, V, I>(&mut self, key: K, members: I) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
I: IntoIterator<Item = V>,
{
let members_json: Vec<String> = members
.into_iter()
.map(|m| serde_json::to_string(&m))
.collect::<Result<_, _>>()?;
let command = SAddCommand::multiple(key, &members_json);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn srem<K, V, I>(&mut self, key: K, members: I) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
I: IntoIterator<Item = V>,
{
let members_json: Vec<String> = members
.into_iter()
.map(|m| serde_json::to_string(&m))
.collect::<Result<_, _>>()?;
let mut args = vec!["SREM".to_string(), key.to_string()];
args.extend(members_json.clone());
let command = GenericCommand::new(&args, true);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn lpush<K, V, I>(&mut self, key: K, values: I) -> RedissonResult<&mut Self>
where
K: ToString,
V: Serialize,
I: IntoIterator<Item = V>,
{
let values_json: Vec<String> = values
.into_iter()
.map(|v| serde_json::to_string(&v))
.collect::<Result<_, _>>()?;
let command = LPushCommand::multiple(key, &values_json);
self.commands.push(Box::new(command));
Ok(self)
}
pub fn lpop<K>(&mut self, key: K) -> &mut Self
where
K: ToString,
{
let command = GenericCommand::new(&["LPOP", &key.to_string()], true);
self.commands.push(Box::new(command));
self
}
pub fn expire<K>(&mut self, key: K, seconds: i64) -> &mut Self
where
K: ToString,
{
let command = ExpireCommand::new(key, seconds);
self.commands.push(Box::new(command));
self
}
pub fn add_commands<I>(&mut self, commands: I) -> &mut Self
where
I: IntoIterator<Item = Box<dyn CommandBuilder>>,
{
self.commands.extend(commands);
self
}
pub fn add_command(&mut self, command: Box<dyn CommandBuilder>) -> &mut Self {
self.commands.push(command);
self
}
pub async fn execute(&mut self) -> RedissonResult<TransactionResult> {
let start_time = Instant::now();
let transaction_id = Uuid::new_v4().to_string();
let mut retries = 0;
let mut backoff_ms = self.transaction_config.initial_backoff_ms;
loop {
match self.try_execute_once(start_time.elapsed()).await {
Ok((results, batches_executed)) => {
return Ok(TransactionResult {
success: true,
retries,
execution_time: start_time.elapsed(),
results,
watch_keys: self.watch_keys.clone(),
transaction_id: transaction_id.clone(),
batches_executed,
});
}
Err(e) => {
if retries >= self.transaction_config.max_retries {
return Err(e);
}
let should_retry = match &e {
RedissonError::RedisError(msg) => {
msg.to_string().contains("WATCH") || msg.to_string().contains("EXECABORT")
}
RedissonError::TransactionConflict => true,
_ => false,
};
if !should_retry {
return Err(e);
}
retries += 1;
time::sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(self.transaction_config.max_backoff_ms);
self.optimistic_lock_attempts += 1;
continue;
}
}
}
}
async fn try_execute_once(
&self,
elapsed: Duration,
) -> RedissonResult<(Vec<BatchResult>, usize)> {
if let Some(timeout) = self.transaction_config.timeout {
if elapsed > timeout {
return Err(RedissonError::TimeoutError);
}
}
let mut conn = self.manager.get_connection().await?;
if !self.watch_keys.is_empty() && self.transaction_config.enable_watch {
redis::cmd("WATCH")
.arg(&self.watch_keys)
.query_async::<()>(&mut conn)
.await?;
}
redis::cmd("MULTI").query_async::<()>(&mut conn).await?;
let mut all_results = Vec::new();
let mut batches_executed = 0;
let batch_config = self.batch_processor.get_batch_config();
for chunk in self.commands.chunks(batch_config.max_batch_size) {
let chunk_vec = chunk.iter().map(|cmd| cmd.box_clone()).collect::<Vec<_>>();
if batch_config.enable_pipeline {
let results = self.batch_processor.query_batch(chunk_vec).await?;
all_results.extend(results);
} else {
for cmd in chunk {
let results = self.batch_processor.query_batch(vec![cmd.box_clone()]).await?;
all_results.extend(results);
}
}
batches_executed += 1;
}
let exec_result: Value = redis::cmd("EXEC")
.query_async(&mut conn)
.await?;
if let Value::Nil = exec_result {
return Err(RedissonError::TransactionConflict);
}
let results = match exec_result {
Value::Array(values) => {
self.parse_exec_results(&values).await?
}
_ => {
return Err(RedissonError::InvalidOperation("Transaction execution failed".to_string()));
}
};
Ok((results, batches_executed))
}
async fn parse_exec_results(&self, values: &[Value]) -> RedissonResult<Vec<BatchResult>> {
let mut results = Vec::new();
for (i, value) in values.iter().enumerate() {
if i >= self.commands.len() {
break;
}
let result = convert_value_to_batch_result(value.clone())?;
results.push(result);
}
Ok(results)
}
pub async fn discard(&mut self) -> RedissonResult<()> {
let mut conn = self.manager.get_connection().await?;
if !self.watch_keys.is_empty() {
redis::cmd("UNWATCH").query_async::<()>(&mut conn).await?;
}
if !self.commands.is_empty() {
redis::cmd("DISCARD").query_async::<()>(&mut conn).await?;
}
Ok(())
}
pub fn command_count(&self) -> usize {
self.commands.len()
}
pub fn watch_count(&self) -> usize {
self.watch_keys.len()
}
pub fn clear(&mut self) -> &mut Self {
self.commands.clear();
self.watch_keys.clear();
self.read_only = false;
self.optimistic_lock_attempts = 0;
self
}
pub fn optimistic_lock_attempts(&self) -> usize {
self.optimistic_lock_attempts
}
pub async fn get_batch_stats(&self) -> BatchStats {
self.batch_processor.get_stats().await
}
pub async fn close_batch_processor(&self) -> RedissonResult<()> {
self.batch_processor.close().await
}
}
pub struct AsyncTransactionBuilder {
manager: Arc<AsyncRedisConnectionManager>,
config: TransactionConfig,
batch_config: BatchConfig,
}
impl AsyncTransactionBuilder {
pub fn new(manager: Arc<AsyncRedisConnectionManager>) -> Self {
Self {
manager,
config: TransactionConfig::default(),
batch_config: BatchConfig::default(),
}
}
pub fn with_config(mut self, config: TransactionConfig) -> Self {
self.config = config;
self
}
pub fn with_batch_config(mut self, batch_config: BatchConfig) -> Self {
self.batch_config = batch_config;
self
}
pub fn max_retries(mut self, max_retries: u32) -> Self {
self.config.max_retries = max_retries;
self
}
pub fn enable_watch(mut self, enable: bool) -> Self {
self.config.enable_watch = enable;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
self
}
pub async fn build(&self) -> AsyncTransactionContext {
AsyncTransactionContext::with_config(
self.manager.clone(),
self.config.clone(),
Some(self.batch_config.clone()),
).await
}
}