guts_storage/
hybrid.rs

1//! Hybrid storage with hot/cold tiering.
2//!
3//! Combines in-memory storage for hot data with persistent storage
4//! for cold data, automatically migrating objects based on access patterns.
5
6use crate::{CacheConfig, CachedStorage, GitObject, ObjectId, ObjectStore, Result, StorageError};
7use parking_lot::RwLock;
8use std::collections::HashSet;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12/// Configuration for hybrid storage.
13#[derive(Debug, Clone)]
14pub struct HybridConfig {
15    /// Maximum number of objects in hot storage.
16    pub hot_max_objects: usize,
17    /// Maximum size in bytes for hot storage.
18    pub hot_max_bytes: usize,
19    /// Cache configuration for the warm layer.
20    pub cache_config: CacheConfig,
21    /// Threshold for promoting objects to hot storage.
22    pub promote_threshold: u32,
23    /// Interval for background migration (seconds).
24    pub migration_interval_secs: u64,
25}
26
27impl Default for HybridConfig {
28    fn default() -> Self {
29        Self {
30            hot_max_objects: 10_000,
31            hot_max_bytes: 512 * 1024 * 1024, // 512 MB
32            cache_config: CacheConfig::default(),
33            promote_threshold: 3,
34            migration_interval_secs: 60,
35        }
36    }
37}
38
39/// Access tracking for objects.
40#[derive(Debug, Default)]
41struct AccessTracker {
42    /// Access counts per object.
43    counts: RwLock<std::collections::HashMap<ObjectId, u32>>,
44    /// Total access count.
45    total_accesses: AtomicU64,
46}
47
48impl AccessTracker {
49    fn record_access(&self, id: &ObjectId) -> u32 {
50        self.total_accesses.fetch_add(1, Ordering::Relaxed);
51        let mut counts = self.counts.write();
52        let count = counts.entry(*id).or_insert(0);
53        *count += 1;
54        *count
55    }
56
57    fn get_count(&self, id: &ObjectId) -> u32 {
58        self.counts.read().get(id).copied().unwrap_or(0)
59    }
60
61    fn reset(&self, id: &ObjectId) {
62        self.counts.write().remove(id);
63    }
64}
65
66/// Hybrid storage combining hot in-memory and cold persistent storage.
67pub struct HybridStorage<C> {
68    /// Hot storage (in-memory, frequently accessed).
69    hot: Arc<ObjectStore>,
70    /// Cold storage (persistent).
71    cold: Arc<C>,
72    /// Cache layer on top of cold storage.
73    cache: CachedStorage<Arc<C>>,
74    /// Set of object IDs in hot storage.
75    hot_objects: RwLock<HashSet<ObjectId>>,
76    /// Current size of hot storage in bytes.
77    hot_size: AtomicU64,
78    /// Access tracker.
79    tracker: AccessTracker,
80    /// Configuration.
81    config: HybridConfig,
82    /// Statistics.
83    stats: HybridStats,
84}
85
86/// Hybrid storage statistics.
87#[derive(Debug, Default)]
88struct HybridStats {
89    hot_hits: AtomicU64,
90    hot_misses: AtomicU64,
91    promotions: AtomicU64,
92    demotions: AtomicU64,
93}
94
95impl<C> HybridStorage<C>
96where
97    C: crate::traits::ObjectStoreBackend + Send + Sync + 'static,
98{
99    /// Creates a new hybrid storage.
100    pub fn new(cold: C, config: HybridConfig) -> Self {
101        let cold = Arc::new(cold);
102        let cache = CachedStorage::new(Arc::clone(&cold), config.cache_config.clone());
103
104        Self {
105            hot: Arc::new(ObjectStore::new()),
106            cold,
107            cache,
108            hot_objects: RwLock::new(HashSet::new()),
109            hot_size: AtomicU64::new(0),
110            tracker: AccessTracker::default(),
111            config,
112            stats: HybridStats::default(),
113        }
114    }
115
116    /// Creates with default configuration.
117    pub fn with_defaults(cold: C) -> Self {
118        Self::new(cold, HybridConfig::default())
119    }
120
121    /// Gets an object, checking hot storage first.
122    pub fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
123        // Track access
124        let access_count = self.tracker.record_access(id);
125
126        // Check hot storage first
127        if self.hot_objects.read().contains(id) {
128            self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
129            match self.hot.get(id) {
130                Ok(obj) => return Ok(Some(obj)),
131                Err(StorageError::ObjectNotFound(_)) => {
132                    // Object was evicted, continue to cold
133                }
134                Err(e) => return Err(e),
135            }
136        }
137
138        self.stats.hot_misses.fetch_add(1, Ordering::Relaxed);
139
140        // Check cold storage (through cache)
141        let result = self.cache.get(id)?;
142
143        // Consider promotion if frequently accessed
144        if let Some(ref obj) = result {
145            if access_count >= self.config.promote_threshold {
146                self.try_promote(obj.clone());
147            }
148        }
149
150        Ok(result)
151    }
152
153    /// Puts an object (always goes to hot first, then cold).
154    pub fn put(&self, object: GitObject) -> Result<ObjectId> {
155        let size = object.data.len() as u64;
156        let id = object.id;
157
158        // Always write to cold for durability
159        self.cold.put(object.clone())?;
160
161        // Try to add to hot storage
162        if self.can_add_to_hot(size) {
163            self.hot.put(object);
164            self.hot_objects.write().insert(id);
165            self.hot_size.fetch_add(size, Ordering::Relaxed);
166        }
167
168        Ok(id)
169    }
170
171    /// Checks if an object exists.
172    pub fn contains(&self, id: &ObjectId) -> Result<bool> {
173        if self.hot_objects.read().contains(id) {
174            return Ok(true);
175        }
176        self.cold.contains(id)
177    }
178
179    /// Deletes an object from all tiers.
180    pub fn delete(&self, id: &ObjectId) -> Result<bool> {
181        // Remove from hot
182        if self.hot_objects.write().remove(id) {
183            if let Ok(obj) = self.hot.get(id) {
184                self.hot_size
185                    .fetch_sub(obj.data.len() as u64, Ordering::Relaxed);
186            }
187        }
188
189        // Remove from cache
190        self.cache.invalidate(id);
191
192        // Remove from cold
193        self.cold.delete(id)
194    }
195
196    /// Returns the total number of objects.
197    pub fn len(&self) -> Result<usize> {
198        self.cold.len()
199    }
200
201    /// Returns true if empty.
202    pub fn is_empty(&self) -> Result<bool> {
203        self.cold.is_empty()
204    }
205
206    /// Lists all object IDs.
207    pub fn list_objects(&self) -> Result<Vec<ObjectId>> {
208        self.cold.list_objects()
209    }
210
211    /// Flushes hot storage to cold.
212    pub fn flush(&self) -> Result<()> {
213        // Flush cold storage
214        self.cold.flush()
215    }
216
217    /// Checks if we can add an object to hot storage.
218    fn can_add_to_hot(&self, size: u64) -> bool {
219        let current_size = self.hot_size.load(Ordering::Relaxed);
220        let current_count = self.hot_objects.read().len();
221
222        current_count < self.config.hot_max_objects
223            && current_size + size <= self.config.hot_max_bytes as u64
224    }
225
226    /// Tries to promote an object to hot storage.
227    fn try_promote(&self, object: GitObject) {
228        let size = object.data.len() as u64;
229        let id = object.id;
230
231        // Evict if needed
232        while !self.can_add_to_hot(size) {
233            if !self.evict_one() {
234                return; // Can't evict, give up
235            }
236        }
237
238        self.hot.put(object);
239        self.hot_objects.write().insert(id);
240        self.hot_size.fetch_add(size, Ordering::Relaxed);
241        self.stats.promotions.fetch_add(1, Ordering::Relaxed);
242    }
243
244    /// Evicts one object from hot storage (LRU based on access count).
245    fn evict_one(&self) -> bool {
246        let hot_objects = self.hot_objects.read();
247        if hot_objects.is_empty() {
248            return false;
249        }
250
251        // Find object with lowest access count
252        let victim = hot_objects
253            .iter()
254            .min_by_key(|id| self.tracker.get_count(id))
255            .copied();
256
257        drop(hot_objects);
258
259        if let Some(victim_id) = victim {
260            if let Ok(obj) = self.hot.get(&victim_id) {
261                let size = obj.data.len() as u64;
262                self.hot_objects.write().remove(&victim_id);
263                self.hot_size.fetch_sub(size, Ordering::Relaxed);
264                self.tracker.reset(&victim_id);
265                self.stats.demotions.fetch_add(1, Ordering::Relaxed);
266                return true;
267            }
268        }
269
270        false
271    }
272
273    /// Returns storage statistics.
274    pub fn stats(&self) -> HybridStatsSnapshot {
275        HybridStatsSnapshot {
276            hot_objects: self.hot_objects.read().len(),
277            hot_size_bytes: self.hot_size.load(Ordering::Relaxed),
278            hot_hits: self.stats.hot_hits.load(Ordering::Relaxed),
279            hot_misses: self.stats.hot_misses.load(Ordering::Relaxed),
280            promotions: self.stats.promotions.load(Ordering::Relaxed),
281            demotions: self.stats.demotions.load(Ordering::Relaxed),
282            cache_stats: self.cache.stats(),
283        }
284    }
285}
286
287/// Snapshot of hybrid storage statistics.
288#[derive(Debug, Clone)]
289pub struct HybridStatsSnapshot {
290    pub hot_objects: usize,
291    pub hot_size_bytes: u64,
292    pub hot_hits: u64,
293    pub hot_misses: u64,
294    pub promotions: u64,
295    pub demotions: u64,
296    pub cache_stats: crate::CacheStats,
297}
298
299impl HybridStatsSnapshot {
300    /// Returns the hot storage hit ratio.
301    pub fn hot_hit_ratio(&self) -> f64 {
302        let total = self.hot_hits + self.hot_misses;
303        if total == 0 {
304            0.0
305        } else {
306            self.hot_hits as f64 / total as f64
307        }
308    }
309}
310
311// Implement ObjectStoreBackend for HybridStorage
312impl<C> crate::traits::ObjectStoreBackend for HybridStorage<C>
313where
314    C: crate::traits::ObjectStoreBackend + Send + Sync + 'static,
315{
316    fn put(&self, object: GitObject) -> Result<ObjectId> {
317        HybridStorage::put(self, object)
318    }
319
320    fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
321        HybridStorage::get(self, id)
322    }
323
324    fn contains(&self, id: &ObjectId) -> Result<bool> {
325        HybridStorage::contains(self, id)
326    }
327
328    fn delete(&self, id: &ObjectId) -> Result<bool> {
329        HybridStorage::delete(self, id)
330    }
331
332    fn len(&self) -> Result<usize> {
333        HybridStorage::len(self)
334    }
335
336    fn list_objects(&self) -> Result<Vec<ObjectId>> {
337        HybridStorage::list_objects(self)
338    }
339
340    fn flush(&self) -> Result<()> {
341        HybridStorage::flush(self)
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::traits::ObjectStoreBackend;
349
350    // Simple in-memory cold storage for testing
351    struct MemoryCold {
352        store: ObjectStore,
353    }
354
355    impl MemoryCold {
356        fn new() -> Self {
357            Self {
358                store: ObjectStore::new(),
359            }
360        }
361    }
362
363    impl crate::traits::ObjectStoreBackend for MemoryCold {
364        fn put(&self, object: GitObject) -> Result<ObjectId> {
365            Ok(self.store.put(object))
366        }
367
368        fn get(&self, id: &ObjectId) -> Result<Option<GitObject>> {
369            match self.store.get(id) {
370                Ok(obj) => Ok(Some(obj)),
371                Err(StorageError::ObjectNotFound(_)) => Ok(None),
372                Err(e) => Err(e),
373            }
374        }
375
376        fn contains(&self, id: &ObjectId) -> Result<bool> {
377            Ok(self.store.contains(id))
378        }
379
380        fn delete(&self, _id: &ObjectId) -> Result<bool> {
381            Ok(false)
382        }
383
384        fn len(&self) -> Result<usize> {
385            Ok(self.store.len())
386        }
387
388        fn list_objects(&self) -> Result<Vec<ObjectId>> {
389            Ok(self.store.list_objects())
390        }
391    }
392
393    #[test]
394    fn test_hybrid_put_get() {
395        let cold = MemoryCold::new();
396        let hybrid = HybridStorage::with_defaults(cold);
397
398        let obj = GitObject::blob(b"test data".to_vec());
399        let id = hybrid.put(obj.clone()).unwrap();
400
401        let retrieved = hybrid.get(&id).unwrap().unwrap();
402        assert_eq!(retrieved.id, obj.id);
403    }
404
405    #[test]
406    fn test_hot_storage_hit() {
407        let cold = MemoryCold::new();
408        let hybrid = HybridStorage::with_defaults(cold);
409
410        let obj = GitObject::blob(b"hot data".to_vec());
411        let id = hybrid.put(obj).unwrap();
412
413        // Should be in hot storage
414        assert!(hybrid.hot_objects.read().contains(&id));
415
416        // Get should hit hot storage
417        hybrid.get(&id).unwrap();
418
419        let stats = hybrid.stats();
420        assert_eq!(stats.hot_hits, 1);
421    }
422
423    #[test]
424    fn test_promotion() {
425        let config = HybridConfig {
426            promote_threshold: 2,
427            ..Default::default()
428        };
429        let cold = MemoryCold::new();
430        let hybrid = HybridStorage::new(cold, config);
431
432        // Put object directly in cold (bypass hybrid)
433        let obj = GitObject::blob(b"promote me".to_vec());
434        hybrid.cold.put(obj.clone()).unwrap();
435
436        // First access - not promoted
437        hybrid.get(&obj.id).unwrap();
438        assert!(!hybrid.hot_objects.read().contains(&obj.id));
439
440        // Second access - should be promoted
441        hybrid.get(&obj.id).unwrap();
442        assert!(hybrid.hot_objects.read().contains(&obj.id));
443
444        let stats = hybrid.stats();
445        assert_eq!(stats.promotions, 1);
446    }
447
448    #[test]
449    fn test_eviction() {
450        let config = HybridConfig {
451            hot_max_objects: 2,
452            hot_max_bytes: 30, // Small size to trigger eviction
453            ..Default::default()
454        };
455        let cold = MemoryCold::new();
456        let hybrid = HybridStorage::new(cold, config);
457
458        // Add 3 objects (each ~15 bytes, so 3rd will exceed limit)
459        for i in 0..3 {
460            let obj = GitObject::blob(format!("data-{}-pad", i).into_bytes());
461            hybrid.put(obj).unwrap();
462        }
463
464        // Should have limited capacity in hot storage
465        assert!(hybrid.hot_objects.read().len() <= 2);
466    }
467
468    #[test]
469    fn test_stats() {
470        let cold = MemoryCold::new();
471        let hybrid = HybridStorage::with_defaults(cold);
472
473        let obj = GitObject::blob(b"test".to_vec());
474        let id = hybrid.put(obj).unwrap();
475
476        // Multiple accesses
477        for _ in 0..5 {
478            hybrid.get(&id).unwrap();
479        }
480
481        let stats = hybrid.stats();
482        assert!(stats.hot_objects > 0);
483        assert!(stats.hot_hits > 0);
484    }
485}