#[cfg(feature = "batch-write")]
use crate::backend::{l1::L1Backend, l2::L2Backend};
#[cfg(feature = "batch-write")]
use crate::error::Result;
#[cfg(feature = "batch-write")]
use crate::recovery::health::HealthState;
#[cfg(feature = "batch-write")]
use dashmap::DashMap;
#[cfg(feature = "batch-write")]
use std::sync::Arc;
#[cfg(feature = "batch-write")]
use tokio::sync::{Notify, RwLock};
#[cfg(feature = "batch-write")]
pub struct PromotionManager {
in_flight: DashMap<String, Arc<Notify>>,
l1: Arc<L1Backend>,
l2: Arc<L2Backend>,
#[allow(dead_code)]
health_state: Arc<RwLock<HealthState>>,
}
#[cfg(feature = "batch-write")]
impl PromotionManager {
pub fn new(
l1: Arc<L1Backend>,
l2: Arc<L2Backend>,
health_state: Arc<RwLock<HealthState>>,
) -> Self {
Self {
in_flight: DashMap::new(),
l1,
l2,
health_state,
}
}
pub async fn promote(&self, key: String, value: Vec<u8>, version: u64) -> Result<()> {
let notify = self.in_flight.get(&key).map(|r| r.value().clone());
if let Some(notify) = notify {
notify.notified().await;
return Ok(());
}
let notify = Arc::new(Notify::new());
self.in_flight.insert(key.clone(), notify.clone());
let result = async {
let l2_ttl = self.l2.ttl(&key).await?;
let l1_default_ttl = 300;
let actual_ttl = match l2_ttl {
Some(ttl) if ttl > 5 => ttl.min(l1_default_ttl),
_ => return Ok(()),
};
self.l1
.set_with_metadata(&key, value, actual_ttl, version)
.await
}
.await;
if let Some((_, n)) = self.in_flight.remove(&key) {
n.notify_waiters();
}
result
}
}
#[cfg(not(feature = "batch-write"))]
use crate::error::Result;
#[cfg(not(feature = "batch-write"))]
use std::sync::Arc;
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct PromotionManager;
#[cfg(not(feature = "batch-write"))]
impl PromotionManager {
pub fn new(
_l1: Arc<L1Backend>,
_l2: Arc<L2Backend>,
_health_state: Arc<tokio::sync::RwLock<HealthState>>,
) -> Self {
Self
}
pub async fn promote(&self, _key: String, _value: Vec<u8>, _version: u64) -> Result<()> {
Ok(())
}
}
#[cfg(not(feature = "batch-write"))]
use crate::backend::{l1::L1Backend, l2::L2Backend};
#[cfg(not(feature = "batch-write"))]
use crate::recovery::health::HealthState;