use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use bytes::Bytes;
use d_engine_core::Lease;
use dashmap::DashMap;
use serde::Deserialize;
use serde::Serialize;
use crate::Result;
#[derive(Debug)]
pub struct DefaultLease {
config: d_engine_core::config::LeaseConfig,
apply_counter: AtomicU64,
key_to_expiry: DashMap<Bytes, SystemTime>,
has_keys: AtomicBool,
}
impl DefaultLease {
pub fn new(config: d_engine_core::config::LeaseConfig) -> Self {
Self {
config,
apply_counter: AtomicU64::new(0),
key_to_expiry: DashMap::new(),
has_keys: AtomicBool::new(false),
}
}
#[allow(dead_code)]
fn cleanup_expired_with_limit(
&self,
max_duration_ms: u64,
) -> Vec<Bytes> {
let start = Instant::now();
let now = SystemTime::now();
let to_remove: Vec<Bytes> = self
.key_to_expiry
.iter()
.take_while(|_| start.elapsed().as_millis() <= max_duration_ms as u128)
.filter(|entry| *entry.value() <= now)
.map(|entry| entry.key().clone())
.collect();
to_remove
.into_iter()
.filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
.collect()
}
#[allow(dead_code)]
pub fn get_expiration(
&self,
key: &[u8],
) -> Option<SystemTime> {
self.key_to_expiry.get(key).map(|entry| *entry.value())
}
pub fn from_snapshot(
data: &[u8],
config: d_engine_core::config::LeaseConfig,
) -> Self {
let snapshot: LeaseSnapshot = match bincode::deserialize(data) {
Ok(s) => s,
Err(_) => return Self::new(config),
};
let now = SystemTime::now();
let manager = Self::new(config);
for (key, expire_at) in snapshot.key_to_expiry {
if expire_at > now {
let key_bytes = Bytes::from(key);
manager.key_to_expiry.insert(key_bytes, expire_at);
}
}
if !manager.key_to_expiry.is_empty() {
manager.has_keys.store(true, Ordering::Relaxed);
}
manager
}
pub fn config(&self) -> &d_engine_core::config::LeaseConfig {
&self.config
}
}
impl Lease for DefaultLease {
fn register(
&self,
key: Bytes,
ttl_secs: u64,
) {
self.has_keys.store(true, Ordering::Relaxed);
let expire_at = SystemTime::now() + Duration::from_secs(ttl_secs);
self.key_to_expiry.insert(key, expire_at);
}
fn unregister(
&self,
key: &[u8],
) {
self.key_to_expiry.remove(key);
}
fn is_expired(
&self,
key: &[u8],
) -> bool {
if let Some(expire_at) = self.key_to_expiry.get(key) {
*expire_at <= SystemTime::now()
} else {
false
}
}
fn get_expired_keys(
&self,
now: SystemTime,
) -> Vec<Bytes> {
let to_remove: Vec<Bytes> = self
.key_to_expiry
.iter()
.filter(|entry| *entry.value() <= now)
.map(|entry| entry.key().clone())
.collect();
to_remove
.into_iter()
.filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
.collect()
}
fn on_apply(&self) -> Vec<Bytes> {
vec![]
}
fn has_lease_keys(&self) -> bool {
self.has_keys.load(Ordering::Relaxed)
}
fn may_have_expired_keys(
&self,
now: SystemTime,
) -> bool {
if !self.has_lease_keys() {
return false;
}
for entry in self.key_to_expiry.iter().take(10) {
if *entry.value() <= now {
return true;
}
}
false
}
fn len(&self) -> usize {
self.key_to_expiry.len()
}
fn to_snapshot(&self) -> Vec<u8> {
let snapshot = LeaseSnapshot {
key_to_expiry: self
.key_to_expiry
.iter()
.map(|entry| (entry.key().to_vec(), *entry.value()))
.collect(),
};
bincode::serialize(&snapshot).unwrap_or_default()
}
fn reload(
&self,
data: &[u8],
) -> Result<()> {
let snapshot: LeaseSnapshot = bincode::deserialize(data).map_err(|e| {
crate::Error::System(d_engine_core::SystemError::Storage(
d_engine_core::StorageError::StateMachineError(format!(
"Failed to deserialize lease snapshot: {e}"
)),
))
})?;
let now = SystemTime::now();
self.key_to_expiry.clear();
self.apply_counter.store(0, Ordering::Relaxed);
for (key, expire_at) in snapshot.key_to_expiry {
if expire_at > now {
let key_bytes = Bytes::from(key);
self.key_to_expiry.insert(key_bytes, expire_at);
}
}
if !self.key_to_expiry.is_empty() {
self.has_keys.store(true, Ordering::Relaxed);
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
struct LeaseSnapshot {
key_to_expiry: HashMap<Vec<u8>, SystemTime>,
}