use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{Error, Message, buffer::Buffer};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RedisBufferConfig {
pub url: String,
pub key_prefix: String,
pub max_size: usize,
pub timeout_ms: u64,
}
pub struct RedisBuffer {
config: RedisBufferConfig,
connected: AtomicBool,
size: Mutex<usize>,
}
impl RedisBuffer {
pub fn new(config: &RedisBufferConfig) -> Result<Self, Error> {
Ok(Self {
config: config.clone(),
connected: AtomicBool::new(false),
size: Mutex::new(0),
})
}
fn generate_key(&self, index: usize) -> String {
format!("{}_msg_{}", self.config.key_prefix, index)
}
fn read_index_key(&self) -> String {
format!("{}_read_index", self.config.key_prefix)
}
fn write_index_key(&self) -> String {
format!("{}_write_index", self.config.key_prefix)
}
}
#[async_trait]
impl Buffer for RedisBuffer {
async fn push(&self, msg: &Message) -> Result<(), Error> {
if !self.connected.load(Ordering::SeqCst) {
return Err(Error::Connection("缓冲区未连接".to_string()));
}
let mut size = self.size.lock().map_err(|e| Error::Unknown(e.to_string()))?;
if *size >= self.config.max_size {
return Err(Error::Processing("Redis缓冲区已满".to_string()));
}
*size += 1;
Ok(())
}
async fn pop(&self) -> Result<Option<Message>, Error> {
if !self.connected.load(Ordering::SeqCst) {
return Err(Error::Connection("缓冲区未连接".to_string()));
}
let mut size = self.size.lock().map_err(|e| Error::Unknown(e.to_string()))?;
if *size == 0 {
return Ok(None);
}
*size -= 1;
Ok(Some(Message::from_string("模拟Redis缓冲区消息")))
}
async fn close(&self) -> Result<(), Error> {
self.connected.store(false, Ordering::SeqCst);
let mut size = self.size.lock().map_err(|e| Error::Unknown(e.to_string()))?;
*size = 0;
Ok(())
}
}