#[cfg(feature = "batch-write")]
use crate::backend::l1::L1Backend;
#[cfg(feature = "batch-write")]
use crate::error::Result;
#[cfg(feature = "batch-write")]
use crate::recovery::health::HealthState;
#[cfg(feature = "batch-write")]
use futures::stream::StreamExt;
#[cfg(feature = "batch-write")]
use std::sync::Arc;
#[cfg(feature = "batch-write")]
use tokio::sync::RwLock;
#[cfg(feature = "batch-write")]
use tracing::{debug, instrument};
#[cfg(feature = "batch-write")]
pub struct InvalidationSubscriber {
client: redis::Client,
l1: Arc<L1Backend>,
channel: String,
health_state: Arc<RwLock<HealthState>>,
}
#[cfg(feature = "batch-write")]
impl InvalidationSubscriber {
pub fn new(
client: redis::Client,
l1: Arc<L1Backend>,
channel: String,
health_state: Arc<RwLock<HealthState>>,
) -> Self {
Self {
client,
l1,
channel,
health_state,
}
}
#[instrument(skip(self), level = "debug")]
pub async fn start(self) -> Result<()> {
#[allow(deprecated)]
let conn = self.client.get_async_connection().await?;
let mut pubsub = conn.into_pubsub();
pubsub.subscribe(&self.channel).await?;
let _l1 = self.l1.clone();
let _health_state = self.health_state.clone();
debug!("InvalidationSubscriber: 启动订阅者,频道={}", self.channel);
tokio::spawn(async move {
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
debug!("InvalidationSubscriber: 收到消息");
let state = _health_state.read().await;
debug!("InvalidationSubscriber: 当前健康状态={:?}", *state);
match *state {
HealthState::Healthy => {
drop(state);
let payload: String = match msg.get_payload() {
Ok(payload) => payload,
Err(e) => {
debug!("InvalidationSubscriber: 解析消息失败: {}", e);
continue;
}
};
debug!("InvalidationSubscriber: 处理失效消息,key={}", payload);
let _ = _l1.delete(&payload).await;
debug!("L1键已失效: {}", payload);
}
HealthState::Degraded { .. } | HealthState::Recovering { .. } => {
drop(state);
debug!("Skipping invalidation during Redis outage");
}
HealthState::WalReplaying { .. } => {
drop(state);
debug!("Skipping invalidation during WAL replay");
}
}
}
});
Ok(())
}
}
#[cfg(feature = "batch-write")]
pub struct InvalidationPublisher {
manager: redis::aio::ConnectionManager,
channel: String,
}
#[cfg(feature = "batch-write")]
impl InvalidationPublisher {
pub fn new(manager: redis::aio::ConnectionManager, channel: String) -> Self {
Self { manager, channel }
}
#[instrument(skip(self), level = "debug")]
pub async fn publish(&self, key: &str) -> Result<()> {
debug!("InvalidationPublisher: 发布失效消息,key={}", key);
let mut conn = self.manager.clone();
let _: i32 = redis::cmd("PUBLISH")
.arg(&self.channel)
.arg(key)
.query_async(&mut conn)
.await?;
debug!("InvalidationPublisher: 失效消息发布成功,key={}", key);
Ok(())
}
}
#[cfg(not(feature = "batch-write"))]
use crate::error::Result;
#[cfg(not(feature = "batch-write"))]
use std::sync::Arc;
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct InvalidationSubscriber;
#[cfg(not(feature = "batch-write"))]
impl InvalidationSubscriber {
pub fn new(
_client: redis::Client,
_l1: Arc<L1Backend>,
_channel: String,
_health_state: Arc<tokio::sync::RwLock<HealthState>>,
) -> Self {
Self
}
pub async fn start(self) -> Result<()> {
Ok(())
}
}
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct InvalidationPublisher;
#[cfg(not(feature = "batch-write"))]
impl InvalidationPublisher {
pub fn new(_manager: redis::aio::ConnectionManager, _channel: String) -> Self {
Self
}
pub async fn publish(&self, _key: &str) -> Result<()> {
Ok(())
}
}
#[cfg(not(feature = "batch-write"))]
use crate::backend::l1::L1Backend;
#[cfg(not(feature = "batch-write"))]
use crate::recovery::health::HealthState;