d_engine_server/storage/lease.rs
1//! Default lease implementation for d-engine.
2//!
3//! Provides high-performance lease management with single-index lock-free architecture.
4//! The `Lease` trait is defined in d-engine-core for framework-level abstraction.
5//!
6//! # Architecture
7//!
8//! - **Single index**: DashMap<key, expiration_time> (completely lock-free for register/unregister)
9//! - **Cleanup**: O(N) iteration with time limit and shard read locks (rare operation)
10//!
11//! # Concurrency Model
12//!
13//! - **Register**: O(1) lock-free (single shard write lock)
14//! - **Unregister**: O(1) lock-free (single shard write lock)
15//! - **Cleanup**: O(N) with shard read locks (frequency: 1/1000 applies, duration: 1ms max)
16//! - **No global Mutex** - Eliminates lock contention under high concurrency
17//!
18//! # Performance vs Dual-Index Design
19//!
20//! Old design (BTreeMap + Mutex):
21//! - Register: O(log N) + Mutex lock → contention under concurrency
22//! - Cleanup: O(K log N) + Mutex lock
23//!
24//! New design (DashMap only):
25//! - Register: O(1) lock-free → zero contention
26//! - Cleanup: O(N) with read locks → no write blocking, rare execution
27
28use std::collections::HashMap;
29use std::sync::atomic::AtomicBool;
30use std::sync::atomic::AtomicU64;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::Instant;
34use std::time::SystemTime;
35
36use bytes::Bytes;
37use d_engine_core::Lease;
38use dashmap::DashMap;
39use serde::Deserialize;
40use serde::Serialize;
41
42use crate::Result;
43
44/// Default lease implementation with single-index lock-free architecture.
45///
46/// # Performance Characteristics
47///
48/// - `is_expired()`: O(1), ~10-20ns, lock-free
49/// - `register()`: O(1), ~20ns, lock-free (single shard write lock)
50/// - `unregister()`: O(1), ~20ns, lock-free (single shard write lock)
51/// - `cleanup()`: O(N), time-limited, shard read locks (rare: 1/1000 applies)
52///
53/// # Memory Usage
54///
55/// - Per-key overhead: ~50 bytes (single DashMap entry)
56/// - Expired keys are removed automatically during cleanup
57#[derive(Debug)]
58pub struct DefaultLease {
59 /// Lease cleanup configuration (immutable after creation)
60 config: d_engine_core::config::LeaseConfig,
61
62 /// Apply counter for piggyback cleanup frequency
63 apply_counter: AtomicU64,
64
65 /// ✅ Single index: key → expiration_time (completely lock-free)
66 /// - Register/Unregister: O(1), single shard lock
67 /// - Cleanup: O(N) iteration with shard read locks
68 key_to_expiry: DashMap<Bytes, SystemTime>,
69
70 /// Whether any lease has ever been registered
71 /// Once true, stays true forever (optimization flag)
72 has_keys: AtomicBool,
73}
74
75impl DefaultLease {
76 /// Creates a new default lease manager with the given configuration.
77 ///
78 /// # Arguments
79 /// * `config` - Lease cleanup strategy configuration
80 pub fn new(config: d_engine_core::config::LeaseConfig) -> Self {
81 Self {
82 config,
83 apply_counter: AtomicU64::new(0),
84 key_to_expiry: DashMap::new(),
85 has_keys: AtomicBool::new(false),
86 }
87 }
88
89 /// Cleanup expired keys with time limit
90 ///
91 /// Uses DashMap::iter() which acquires read locks on all shards.
92 /// Read locks allow concurrent writes to proceed (only blocks on same shard).
93 ///
94 /// # Performance
95 ///
96 /// - Iteration: O(N) with shard read locks
97 /// - Removal: O(K) where K = expired keys, lock-free per-key
98 /// - Time limit: Stops after max_duration_ms to prevent long pauses
99 /// - Frequency: Only called every N applies (default: 1/1000)
100 ///
101 /// # Arguments
102 /// * `max_duration_ms` - Maximum duration in milliseconds
103 #[allow(dead_code)]
104 fn cleanup_expired_with_limit(
105 &self,
106 max_duration_ms: u64,
107 ) -> Vec<Bytes> {
108 let start = Instant::now();
109 let now = SystemTime::now();
110
111 // Phase 1: collect expired keys (read-only, with time limit)
112 let to_remove: Vec<Bytes> = self
113 .key_to_expiry
114 .iter()
115 .take_while(|_| start.elapsed().as_millis() <= max_duration_ms as u128)
116 .filter(|entry| *entry.value() <= now)
117 .map(|entry| entry.key().clone())
118 .collect();
119
120 // Phase 2: remove after dropping iter (avoids deadlock)
121 to_remove
122 .into_iter()
123 .filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
124 .collect()
125 }
126
127 /// Get expiration time for a specific key.
128 ///
129 /// # Performance
130 /// O(1) - DashMap lookup, lock-free
131 #[allow(dead_code)]
132 pub fn get_expiration(
133 &self,
134 key: &[u8],
135 ) -> Option<SystemTime> {
136 self.key_to_expiry.get(key).map(|entry| *entry.value())
137 }
138
139 /// Restore from snapshot data (used during initialization).
140 ///
141 /// Filters out already-expired keys during restoration.
142 ///
143 /// # Arguments
144 /// * `data` - Serialized snapshot data
145 /// * `config` - Lease configuration to use
146 pub fn from_snapshot(
147 data: &[u8],
148 config: d_engine_core::config::LeaseConfig,
149 ) -> Self {
150 let snapshot: LeaseSnapshot = match bincode::deserialize(data) {
151 Ok(s) => s,
152 Err(_) => return Self::new(config),
153 };
154
155 let now = SystemTime::now();
156 let manager = Self::new(config);
157
158 // Rebuild single index, skipping expired keys
159 for (key, expire_at) in snapshot.key_to_expiry {
160 if expire_at > now {
161 let key_bytes = Bytes::from(key);
162 manager.key_to_expiry.insert(key_bytes, expire_at);
163 }
164 }
165
166 // Restore has_keys flag if we have any keys
167 if !manager.key_to_expiry.is_empty() {
168 manager.has_keys.store(true, Ordering::Relaxed);
169 }
170
171 manager
172 }
173
174 /// Returns reference to the lease configuration.
175 ///
176 /// Used by StateMachine implementations to check cleanup strategy.
177 pub fn config(&self) -> &d_engine_core::config::LeaseConfig {
178 &self.config
179 }
180}
181
182impl Lease for DefaultLease {
183 /// Register a key with TTL (Time-To-Live).
184 ///
185 /// # TTL Semantics
186 ///
187 /// - **Absolute expiration time**: The expiration time is calculated as `expire_at =
188 /// SystemTime::now() + Duration::from_secs(ttl_secs)` and stored internally.
189 /// - **Crash-safe**: The absolute expiration time survives node restarts. After crash recovery,
190 /// expired keys remain expired (no TTL reset).
191 /// - **Persistent**: The expiration time is persisted to disk during snapshot generation and
192 /// graceful shutdown.
193 ///
194 /// # Example
195 ///
196 /// ```text
197 /// T0: register(key="foo", ttl=10) → expire_at = T0 + 10 = T10
198 /// T5: CRASH
199 /// T12: RESTART
200 /// → WAL replay: expire_at = T10 < T12 (already expired)
201 /// → Key is NOT restored (correctly expired)
202 /// ```
203 ///
204 /// # Arguments
205 ///
206 /// * `key` - The key to register expiration for
207 /// * `ttl_secs` - Time-to-live in seconds from now
208 ///
209 /// # Performance
210 ///
211 /// O(1) - Completely lock-free, only acquires single shard write lock
212 fn register(
213 &self,
214 key: Bytes,
215 ttl_secs: u64,
216 ) {
217 // Mark that lease is being used (lazy activation)
218 self.has_keys.store(true, Ordering::Relaxed);
219
220 // Calculate absolute expiration time
221 let expire_at = SystemTime::now() + Duration::from_secs(ttl_secs);
222
223 // Single index update (overwrites old value if exists)
224 // DashMap::insert is lock-free (only single shard write lock)
225 self.key_to_expiry.insert(key, expire_at);
226 }
227
228 /// Unregister a key's TTL.
229 ///
230 /// # Performance
231 ///
232 /// O(1) - Completely lock-free, only acquires single shard write lock
233 fn unregister(
234 &self,
235 key: &[u8],
236 ) {
237 // Single index removal (lock-free)
238 self.key_to_expiry.remove(key);
239 }
240
241 /// Check if a key has expired.
242 ///
243 /// # Performance
244 ///
245 /// O(1) - Lock-free DashMap lookup
246 fn is_expired(
247 &self,
248 key: &[u8],
249 ) -> bool {
250 if let Some(expire_at) = self.key_to_expiry.get(key) {
251 *expire_at <= SystemTime::now()
252 } else {
253 false
254 }
255 }
256
257 /// Get all expired keys (without time limit).
258 ///
259 /// This method is rarely used directly. Most cleanup happens via `on_apply()`.
260 ///
261 /// # Performance
262 ///
263 /// O(N) - Iterates all keys with shard read locks
264 fn get_expired_keys(
265 &self,
266 now: SystemTime,
267 ) -> Vec<Bytes> {
268 // Phase 1: collect expired keys (read-only)
269 let to_remove: Vec<Bytes> = self
270 .key_to_expiry
271 .iter()
272 .filter(|entry| *entry.value() <= now)
273 .map(|entry| entry.key().clone())
274 .collect();
275
276 // Phase 2: remove after dropping iter (avoids deadlock)
277 to_remove
278 .into_iter()
279 .filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
280 .collect()
281 }
282
283 /// Piggyback cleanup on apply operations (DEPRECATED - no longer used).
284 ///
285 /// This method is kept for backward compatibility but is no longer called.
286 /// Cleanup is now handled by:
287 /// - Lazy strategy: cleanup in get() method
288 /// - Background strategy: dedicated async task
289 ///
290 /// # Performance
291 ///
292 /// - Fast path: O(1) - always returns empty vec
293 fn on_apply(&self) -> Vec<Bytes> {
294 // No-op: piggyback cleanup removed to avoid blocking apply path
295 vec![]
296 }
297
298 /// Check if any keys have been registered.
299 ///
300 /// # Performance
301 ///
302 /// O(1) - Single atomic load
303 fn has_lease_keys(&self) -> bool {
304 self.has_keys.load(Ordering::Relaxed)
305 }
306
307 /// Quick check if there might be expired keys.
308 ///
309 /// This is a heuristic check - samples first 10 entries.
310 /// May return false negatives but never false positives.
311 ///
312 /// # Performance
313 ///
314 /// O(1) - Checks first few entries only
315 fn may_have_expired_keys(
316 &self,
317 now: SystemTime,
318 ) -> bool {
319 if !self.has_lease_keys() {
320 return false;
321 }
322
323 // Quick check: iterate first 10 entries
324 // DashMap::iter().take(10) is cheap (early termination)
325 for entry in self.key_to_expiry.iter().take(10) {
326 if *entry.value() <= now {
327 return true;
328 }
329 }
330
331 false
332 }
333
334 /// Get total number of keys with active leases.
335 ///
336 /// # Performance
337 ///
338 /// O(1) - DashMap maintains internal count
339 fn len(&self) -> usize {
340 self.key_to_expiry.len()
341 }
342
343 /// Serialize current lease state to snapshot.
344 ///
345 /// # Performance
346 ///
347 /// O(N) - Iterates all keys with shard read locks
348 fn to_snapshot(&self) -> Vec<u8> {
349 let snapshot = LeaseSnapshot {
350 key_to_expiry: self
351 .key_to_expiry
352 .iter()
353 .map(|entry| (entry.key().to_vec(), *entry.value()))
354 .collect(),
355 };
356 bincode::serialize(&snapshot).unwrap_or_default()
357 }
358
359 /// Reload lease state from snapshot.
360 ///
361 /// Filters out already-expired keys during restoration.
362 ///
363 /// # Performance
364 ///
365 /// O(N) - Rebuilds single index
366 fn reload(
367 &self,
368 data: &[u8],
369 ) -> Result<()> {
370 let snapshot: LeaseSnapshot = bincode::deserialize(data).map_err(|e| {
371 crate::Error::System(d_engine_core::SystemError::Storage(
372 d_engine_core::StorageError::StateMachineError(format!(
373 "Failed to deserialize lease snapshot: {e}"
374 )),
375 ))
376 })?;
377
378 let now = SystemTime::now();
379
380 // Clear existing data
381 self.key_to_expiry.clear();
382 self.apply_counter.store(0, Ordering::Relaxed);
383
384 // Rebuild single index, skipping expired keys
385 for (key, expire_at) in snapshot.key_to_expiry {
386 if expire_at > now {
387 let key_bytes = Bytes::from(key);
388 self.key_to_expiry.insert(key_bytes, expire_at);
389 }
390 }
391
392 // Update has_keys flag
393 if !self.key_to_expiry.is_empty() {
394 self.has_keys.store(true, Ordering::Relaxed);
395 }
396
397 Ok(())
398 }
399}
400
401/// Snapshot-serializable lease state.
402#[derive(Debug, Serialize, Deserialize)]
403struct LeaseSnapshot {
404 key_to_expiry: HashMap<Vec<u8>, SystemTime>,
405}