d_engine_server/storage/
lease.rs

1//! Default lease implementation for d-engine.
2//!
3//! Provides high-performance lease management with single-index lock-free architecture.
4//! The `Lease` trait is defined in d-engine-core for framework-level abstraction.
5//!
6//! # Architecture
7//!
8//! - **Single index**: DashMap<key, expiration_time> (completely lock-free for register/unregister)
9//! - **Cleanup**: O(N) iteration with time limit and shard read locks (rare operation)
10//!
11//! # Concurrency Model
12//!
13//! - **Register**: O(1) lock-free (single shard write lock)
14//! - **Unregister**: O(1) lock-free (single shard write lock)
15//! - **Cleanup**: O(N) with shard read locks (frequency: 1/1000 applies, duration: 1ms max)
16//! - **No global Mutex** - Eliminates lock contention under high concurrency
17//!
18//! # Performance vs Dual-Index Design
19//!
20//! Old design (BTreeMap + Mutex):
21//! - Register: O(log N) + Mutex lock → contention under concurrency
22//! - Cleanup: O(K log N) + Mutex lock
23//!
24//! New design (DashMap only):
25//! - Register: O(1) lock-free → zero contention
26//! - Cleanup: O(N) with read locks → no write blocking, rare execution
27
28use std::collections::HashMap;
29use std::sync::atomic::AtomicBool;
30use std::sync::atomic::AtomicU64;
31use std::sync::atomic::Ordering;
32use std::time::Duration;
33use std::time::Instant;
34use std::time::SystemTime;
35
36use bytes::Bytes;
37use d_engine_core::Lease;
38use dashmap::DashMap;
39use serde::Deserialize;
40use serde::Serialize;
41
42use crate::Result;
43
44/// Default lease implementation with single-index lock-free architecture.
45///
46/// # Performance Characteristics
47///
48/// - `is_expired()`: O(1), ~10-20ns, lock-free
49/// - `register()`: O(1), ~20ns, lock-free (single shard write lock)
50/// - `unregister()`: O(1), ~20ns, lock-free (single shard write lock)
51/// - `cleanup()`: O(N), time-limited, shard read locks (rare: 1/1000 applies)
52///
53/// # Memory Usage
54///
55/// - Per-key overhead: ~50 bytes (single DashMap entry)
56/// - Expired keys are removed automatically during cleanup
57#[derive(Debug)]
58pub struct DefaultLease {
59    /// Lease cleanup configuration (immutable after creation)
60    config: d_engine_core::config::LeaseConfig,
61
62    /// Apply counter for piggyback cleanup frequency
63    apply_counter: AtomicU64,
64
65    /// ✅ Single index: key → expiration_time (completely lock-free)
66    /// - Register/Unregister: O(1), single shard lock
67    /// - Cleanup: O(N) iteration with shard read locks
68    key_to_expiry: DashMap<Bytes, SystemTime>,
69
70    /// Whether any lease has ever been registered
71    /// Once true, stays true forever (optimization flag)
72    has_keys: AtomicBool,
73}
74
75impl DefaultLease {
76    /// Creates a new default lease manager with the given configuration.
77    ///
78    /// # Arguments
79    /// * `config` - Lease cleanup strategy configuration
80    pub fn new(config: d_engine_core::config::LeaseConfig) -> Self {
81        Self {
82            config,
83            apply_counter: AtomicU64::new(0),
84            key_to_expiry: DashMap::new(),
85            has_keys: AtomicBool::new(false),
86        }
87    }
88
89    /// Cleanup expired keys with time limit
90    ///
91    /// Uses DashMap::iter() which acquires read locks on all shards.
92    /// Read locks allow concurrent writes to proceed (only blocks on same shard).
93    ///
94    /// # Performance
95    ///
96    /// - Iteration: O(N) with shard read locks
97    /// - Removal: O(K) where K = expired keys, lock-free per-key
98    /// - Time limit: Stops after max_duration_ms to prevent long pauses
99    /// - Frequency: Only called every N applies (default: 1/1000)
100    ///
101    /// # Arguments
102    /// * `max_duration_ms` - Maximum duration in milliseconds
103    #[allow(dead_code)]
104    fn cleanup_expired_with_limit(
105        &self,
106        max_duration_ms: u64,
107    ) -> Vec<Bytes> {
108        let start = Instant::now();
109        let now = SystemTime::now();
110
111        // Phase 1: collect expired keys (read-only, with time limit)
112        let to_remove: Vec<Bytes> = self
113            .key_to_expiry
114            .iter()
115            .take_while(|_| start.elapsed().as_millis() <= max_duration_ms as u128)
116            .filter(|entry| *entry.value() <= now)
117            .map(|entry| entry.key().clone())
118            .collect();
119
120        // Phase 2: remove after dropping iter (avoids deadlock)
121        to_remove
122            .into_iter()
123            .filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
124            .collect()
125    }
126
127    /// Get expiration time for a specific key.
128    ///
129    /// # Performance
130    /// O(1) - DashMap lookup, lock-free
131    #[allow(dead_code)]
132    pub fn get_expiration(
133        &self,
134        key: &[u8],
135    ) -> Option<SystemTime> {
136        self.key_to_expiry.get(key).map(|entry| *entry.value())
137    }
138
139    /// Restore from snapshot data (used during initialization).
140    ///
141    /// Filters out already-expired keys during restoration.
142    ///
143    /// # Arguments
144    /// * `data` - Serialized snapshot data
145    /// * `config` - Lease configuration to use
146    pub fn from_snapshot(
147        data: &[u8],
148        config: d_engine_core::config::LeaseConfig,
149    ) -> Self {
150        let snapshot: LeaseSnapshot = match bincode::deserialize(data) {
151            Ok(s) => s,
152            Err(_) => return Self::new(config),
153        };
154
155        let now = SystemTime::now();
156        let manager = Self::new(config);
157
158        // Rebuild single index, skipping expired keys
159        for (key, expire_at) in snapshot.key_to_expiry {
160            if expire_at > now {
161                let key_bytes = Bytes::from(key);
162                manager.key_to_expiry.insert(key_bytes, expire_at);
163            }
164        }
165
166        // Restore has_keys flag if we have any keys
167        if !manager.key_to_expiry.is_empty() {
168            manager.has_keys.store(true, Ordering::Relaxed);
169        }
170
171        manager
172    }
173
174    /// Returns reference to the lease configuration.
175    ///
176    /// Used by StateMachine implementations to check cleanup strategy.
177    pub fn config(&self) -> &d_engine_core::config::LeaseConfig {
178        &self.config
179    }
180}
181
182impl Lease for DefaultLease {
183    /// Register a key with TTL (Time-To-Live).
184    ///
185    /// # TTL Semantics
186    ///
187    /// - **Absolute expiration time**: The expiration time is calculated as `expire_at =
188    ///   SystemTime::now() + Duration::from_secs(ttl_secs)` and stored internally.
189    /// - **Crash-safe**: The absolute expiration time survives node restarts. After crash recovery,
190    ///   expired keys remain expired (no TTL reset).
191    /// - **Persistent**: The expiration time is persisted to disk during snapshot generation and
192    ///   graceful shutdown.
193    ///
194    /// # Example
195    ///
196    /// ```text
197    /// T0:  register(key="foo", ttl=10) → expire_at = T0 + 10 = T10
198    /// T5:  CRASH
199    /// T12: RESTART
200    ///      → WAL replay: expire_at = T10 < T12 (already expired)
201    ///      → Key is NOT restored (correctly expired)
202    /// ```
203    ///
204    /// # Arguments
205    ///
206    /// * `key` - The key to register expiration for
207    /// * `ttl_secs` - Time-to-live in seconds from now
208    ///
209    /// # Performance
210    ///
211    /// O(1) - Completely lock-free, only acquires single shard write lock
212    fn register(
213        &self,
214        key: Bytes,
215        ttl_secs: u64,
216    ) {
217        // Mark that lease is being used (lazy activation)
218        self.has_keys.store(true, Ordering::Relaxed);
219
220        // Calculate absolute expiration time
221        let expire_at = SystemTime::now() + Duration::from_secs(ttl_secs);
222
223        // Single index update (overwrites old value if exists)
224        // DashMap::insert is lock-free (only single shard write lock)
225        self.key_to_expiry.insert(key, expire_at);
226    }
227
228    /// Unregister a key's TTL.
229    ///
230    /// # Performance
231    ///
232    /// O(1) - Completely lock-free, only acquires single shard write lock
233    fn unregister(
234        &self,
235        key: &[u8],
236    ) {
237        // Single index removal (lock-free)
238        self.key_to_expiry.remove(key);
239    }
240
241    /// Check if a key has expired.
242    ///
243    /// # Performance
244    ///
245    /// O(1) - Lock-free DashMap lookup
246    fn is_expired(
247        &self,
248        key: &[u8],
249    ) -> bool {
250        if let Some(expire_at) = self.key_to_expiry.get(key) {
251            *expire_at <= SystemTime::now()
252        } else {
253            false
254        }
255    }
256
257    /// Get all expired keys (without time limit).
258    ///
259    /// This method is rarely used directly. Most cleanup happens via `on_apply()`.
260    ///
261    /// # Performance
262    ///
263    /// O(N) - Iterates all keys with shard read locks
264    fn get_expired_keys(
265        &self,
266        now: SystemTime,
267    ) -> Vec<Bytes> {
268        // Phase 1: collect expired keys (read-only)
269        let to_remove: Vec<Bytes> = self
270            .key_to_expiry
271            .iter()
272            .filter(|entry| *entry.value() <= now)
273            .map(|entry| entry.key().clone())
274            .collect();
275
276        // Phase 2: remove after dropping iter (avoids deadlock)
277        to_remove
278            .into_iter()
279            .filter_map(|key| self.key_to_expiry.remove_if(&key, |_, v| *v <= now).map(|(k, _)| k))
280            .collect()
281    }
282
283    /// Piggyback cleanup on apply operations (DEPRECATED - no longer used).
284    ///
285    /// This method is kept for backward compatibility but is no longer called.
286    /// Cleanup is now handled by:
287    /// - Lazy strategy: cleanup in get() method
288    /// - Background strategy: dedicated async task
289    ///
290    /// # Performance
291    ///
292    /// - Fast path: O(1) - always returns empty vec
293    fn on_apply(&self) -> Vec<Bytes> {
294        // No-op: piggyback cleanup removed to avoid blocking apply path
295        vec![]
296    }
297
298    /// Check if any keys have been registered.
299    ///
300    /// # Performance
301    ///
302    /// O(1) - Single atomic load
303    fn has_lease_keys(&self) -> bool {
304        self.has_keys.load(Ordering::Relaxed)
305    }
306
307    /// Quick check if there might be expired keys.
308    ///
309    /// This is a heuristic check - samples first 10 entries.
310    /// May return false negatives but never false positives.
311    ///
312    /// # Performance
313    ///
314    /// O(1) - Checks first few entries only
315    fn may_have_expired_keys(
316        &self,
317        now: SystemTime,
318    ) -> bool {
319        if !self.has_lease_keys() {
320            return false;
321        }
322
323        // Quick check: iterate first 10 entries
324        // DashMap::iter().take(10) is cheap (early termination)
325        for entry in self.key_to_expiry.iter().take(10) {
326            if *entry.value() <= now {
327                return true;
328            }
329        }
330
331        false
332    }
333
334    /// Get total number of keys with active leases.
335    ///
336    /// # Performance
337    ///
338    /// O(1) - DashMap maintains internal count
339    fn len(&self) -> usize {
340        self.key_to_expiry.len()
341    }
342
343    /// Serialize current lease state to snapshot.
344    ///
345    /// # Performance
346    ///
347    /// O(N) - Iterates all keys with shard read locks
348    fn to_snapshot(&self) -> Vec<u8> {
349        let snapshot = LeaseSnapshot {
350            key_to_expiry: self
351                .key_to_expiry
352                .iter()
353                .map(|entry| (entry.key().to_vec(), *entry.value()))
354                .collect(),
355        };
356        bincode::serialize(&snapshot).unwrap_or_default()
357    }
358
359    /// Reload lease state from snapshot.
360    ///
361    /// Filters out already-expired keys during restoration.
362    ///
363    /// # Performance
364    ///
365    /// O(N) - Rebuilds single index
366    fn reload(
367        &self,
368        data: &[u8],
369    ) -> Result<()> {
370        let snapshot: LeaseSnapshot = bincode::deserialize(data).map_err(|e| {
371            crate::Error::System(d_engine_core::SystemError::Storage(
372                d_engine_core::StorageError::StateMachineError(format!(
373                    "Failed to deserialize lease snapshot: {e}"
374                )),
375            ))
376        })?;
377
378        let now = SystemTime::now();
379
380        // Clear existing data
381        self.key_to_expiry.clear();
382        self.apply_counter.store(0, Ordering::Relaxed);
383
384        // Rebuild single index, skipping expired keys
385        for (key, expire_at) in snapshot.key_to_expiry {
386            if expire_at > now {
387                let key_bytes = Bytes::from(key);
388                self.key_to_expiry.insert(key_bytes, expire_at);
389            }
390        }
391
392        // Update has_keys flag
393        if !self.key_to_expiry.is_empty() {
394            self.has_keys.store(true, Ordering::Relaxed);
395        }
396
397        Ok(())
398    }
399}
400
401/// Snapshot-serializable lease state.
402#[derive(Debug, Serialize, Deserialize)]
403struct LeaseSnapshot {
404    key_to_expiry: HashMap<Vec<u8>, SystemTime>,
405}